Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40/// Storage key suffix for the implicit default graph within a schema.
41/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
42const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44/// Parses a DDL default-value literal string into a [`Value`].
45///
46/// Handles string literals (single- or double-quoted), integers, floats,
47/// booleans (`true`/`false`), and `NULL`.
48fn parse_default_literal(text: &str) -> Value {
49    if text.eq_ignore_ascii_case("null") {
50        return Value::Null;
51    }
52    if text.eq_ignore_ascii_case("true") {
53        return Value::Bool(true);
54    }
55    if text.eq_ignore_ascii_case("false") {
56        return Value::Bool(false);
57    }
58    // String literal: strip surrounding quotes
59    if (text.starts_with('\'') && text.ends_with('\''))
60        || (text.starts_with('"') && text.ends_with('"'))
61    {
62        return Value::String(text[1..text.len() - 1].into());
63    }
64    // Try integer, then float
65    if let Ok(i) = text.parse::<i64>() {
66        return Value::Int64(i);
67    }
68    if let Ok(f) = text.parse::<f64>() {
69        return Value::Float64(f);
70    }
71    // Fallback: treat as string
72    Value::String(text.into())
73}
74
75/// Runtime configuration for creating a new session.
76///
77/// Groups the shared parameters passed to all session constructors, keeping
78/// call sites readable and avoiding long argument lists.
79pub(crate) struct SessionConfig {
80    pub transaction_manager: Arc<TransactionManager>,
81    pub query_cache: Arc<QueryCache>,
82    pub catalog: Arc<Catalog>,
83    pub adaptive_config: AdaptiveConfig,
84    pub factorized_execution: bool,
85    pub graph_model: GraphModel,
86    pub query_timeout: Option<Duration>,
87    pub max_property_size: Option<usize>,
88    /// Buffer manager for memory-aware query execution.
89    pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90    pub commit_counter: Arc<AtomicUsize>,
91    pub gc_interval: usize,
92    /// When true, the session permanently blocks all mutations.
93    pub read_only: bool,
94    /// The identity bound to this session (for permission checks).
95    pub identity: crate::auth::Identity,
96    /// Named graph projections shared with the database.
97    #[cfg(feature = "lpg")]
98    pub projections: Arc<
99        parking_lot::RwLock<
100            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101        >,
102    >,
103}
104
105/// Your handle to the database - execute queries and manage transactions.
106///
107/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
108/// tracks its own transaction state, so you can have multiple concurrent
109/// sessions without them interfering.
110pub struct Session {
111    /// The underlying store.
112    #[cfg(feature = "lpg")]
113    store: Arc<LpgStore>,
114    /// Classifies the role of `store` for the active backend.
115    /// Search procedures (CALL grafeo.search.*) only reach into `store` when
116    /// this is `Active`. External-store sessions keep `store` as a placeholder
117    /// and must not expose it: it has no indexes or data.
118    #[cfg(feature = "lpg")]
119    lpg_backend: LpgBackend,
120    /// Graph store trait object for pluggable storage backends (read path).
121    graph_store: Arc<dyn GraphStoreSearch>,
122    /// Writable graph store (None for read-only databases).
123    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
124    /// Schema and metadata catalog shared across sessions.
125    catalog: Arc<Catalog>,
126    /// RDF triple store (if RDF feature is enabled).
127    #[cfg(feature = "triple-store")]
128    rdf_store: Arc<RdfStore>,
129    /// Transaction manager.
130    transaction_manager: Arc<TransactionManager>,
131    /// Query cache shared across sessions.
132    query_cache: Arc<QueryCache>,
133    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
134    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
135    /// from within `execute(&self)`.
136    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
137    /// Whether the current transaction is read-only (blocks mutations).
138    read_only_tx: parking_lot::Mutex<bool>,
139    /// Whether the database itself is read-only (set at open time, never changes).
140    /// When true, `read_only_tx` is always true regardless of transaction flags.
141    db_read_only: bool,
142    /// The identity bound to this session (determines permission level).
143    identity: crate::auth::Identity,
144    /// Whether the session is in auto-commit mode.
145    auto_commit: bool,
146    /// Adaptive execution configuration.
147    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
148    adaptive_config: AdaptiveConfig,
149    /// Whether to use factorized execution for multi-hop queries.
150    factorized_execution: bool,
151    /// The graph data model this session operates on.
152    graph_model: GraphModel,
153    /// Maximum time a query may run before being cancelled.
154    query_timeout: Option<Duration>,
155    /// Maximum size in bytes for a single property value.
156    max_property_size: Option<usize>,
157    /// Buffer manager for memory-aware execution (spill decisions).
158    buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
159    /// Shared commit counter for triggering auto-GC.
160    commit_counter: Arc<AtomicUsize>,
161    /// GC every N commits (0 = disabled).
162    gc_interval: usize,
163    /// Node count at the start of the current transaction (for PreparedCommit stats).
164    transaction_start_node_count: AtomicUsize,
165    /// Edge count at the start of the current transaction (for PreparedCommit stats).
166    transaction_start_edge_count: AtomicUsize,
167    /// WAL for logging schema changes.
168    #[cfg(feature = "wal")]
169    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
170    /// Shared WAL graph context tracker for named graph awareness.
171    #[cfg(feature = "wal")]
172    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
173    /// CDC log for change tracking.
174    #[cfg(feature = "cdc")]
175    cdc_log: Arc<crate::cdc::CdcLog>,
176    /// Buffered CDC events for the current transaction.
177    /// Flushed to `cdc_log` on commit, discarded on rollback.
178    #[cfg(feature = "cdc")]
179    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
180    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
181    current_graph: parking_lot::Mutex<Option<String>>,
182    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
183    /// None = "not set" (uses default schema).
184    current_schema: parking_lot::Mutex<Option<String>>,
185    /// Session time zone override.
186    time_zone: parking_lot::Mutex<Option<String>>,
187    /// Session-level parameters (SET PARAMETER).
188    session_params:
189        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
190    /// Override epoch for time-travel queries (None = use transaction/current epoch).
191    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
192    /// Savepoints within the current transaction.
193    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
194    /// Nesting depth for nested transactions (0 = outermost).
195    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
196    /// releases it, nested `ROLLBACK` rolls back to it.
197    transaction_nesting_depth: parking_lot::Mutex<u32>,
198    /// Named graphs touched during the current transaction (for cross-graph atomicity).
199    /// `None` represents the default graph. Populated at `BEGIN` time and on each
200    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
201    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
202    /// Count of active `ResultStream`s pinned to this session. Commit and
203    /// rollback block while any streams are outstanding so mid-iteration
204    /// snapshots are not invalidated.
205    active_streams: AtomicUsize,
206    /// Shared metrics registry (populated when the `metrics` feature is enabled).
207    #[cfg(feature = "metrics")]
208    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
209    /// Transaction start time for duration tracking.
210    #[cfg(feature = "metrics")]
211    tx_start_time: parking_lot::Mutex<Option<Instant>>,
212    /// Named graph projections shared with the database.
213    #[cfg(feature = "lpg")]
214    projections: Arc<
215        parking_lot::RwLock<
216            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
217        >,
218    >,
219}
220
221/// Role of the session's internal `LpgStore`.
222#[cfg(feature = "lpg")]
223#[derive(Clone, Copy)]
224enum LpgBackend {
225    /// The internal `LpgStore` is the session's active backing store (possibly
226    /// wrapped by WAL/CDC/Layered decorators on the read/write path). Search
227    /// procedures can reach its HNSW/BM25 indexes.
228    Active,
229    /// The internal `LpgStore` is an empty placeholder because the session is
230    /// backed by an external `GraphStoreSearch` implementation. Search
231    /// procedures must not use it.
232    Placeholder,
233}
234
235/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
236#[derive(Clone)]
237struct GraphSavepoint {
238    graph_name: Option<String>,
239    next_node_id: u64,
240    next_edge_id: u64,
241    undo_log_position: usize,
242}
243
244/// Savepoint state: name + per-graph snapshots + the graph that was active.
245#[derive(Clone)]
246struct SavepointState {
247    name: String,
248    graph_snapshots: Vec<GraphSavepoint>,
249    /// The graph that was active when the savepoint was created.
250    /// Reserved for future use (e.g., restoring graph context on rollback).
251    #[allow(dead_code)]
252    active_graph: Option<String>,
253    /// CDC event buffer position at savepoint creation.
254    /// On rollback-to-savepoint, the buffer is truncated to this position.
255    #[cfg(feature = "cdc")]
256    cdc_event_position: usize,
257}
258
259impl Session {
260    /// Creates a new session with adaptive execution configuration.
261    #[cfg(feature = "lpg")]
262    #[allow(dead_code)] // Used when lpg enabled without triple-store
263    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
264        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
265        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
266        Self {
267            store,
268            lpg_backend: LpgBackend::Active,
269            graph_store,
270            graph_store_mut,
271            catalog: cfg.catalog,
272            #[cfg(feature = "triple-store")]
273            rdf_store: Arc::new(RdfStore::new()),
274            transaction_manager: cfg.transaction_manager,
275            query_cache: cfg.query_cache,
276            current_transaction: parking_lot::Mutex::new(None),
277            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
278            db_read_only: cfg.read_only,
279            identity: cfg.identity,
280            auto_commit: true,
281            adaptive_config: cfg.adaptive_config,
282            factorized_execution: cfg.factorized_execution,
283            graph_model: cfg.graph_model,
284            query_timeout: cfg.query_timeout,
285            max_property_size: cfg.max_property_size,
286            buffer_manager: cfg.buffer_manager,
287            commit_counter: cfg.commit_counter,
288            gc_interval: cfg.gc_interval,
289            transaction_start_node_count: AtomicUsize::new(0),
290            transaction_start_edge_count: AtomicUsize::new(0),
291            #[cfg(feature = "wal")]
292            wal: None,
293            #[cfg(feature = "wal")]
294            wal_graph_context: None,
295            #[cfg(feature = "cdc")]
296            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
297            #[cfg(feature = "cdc")]
298            cdc_pending_events: None,
299            current_graph: parking_lot::Mutex::new(None),
300            current_schema: parking_lot::Mutex::new(None),
301            time_zone: parking_lot::Mutex::new(None),
302            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303            viewing_epoch_override: parking_lot::Mutex::new(None),
304            savepoints: parking_lot::Mutex::new(Vec::new()),
305            transaction_nesting_depth: parking_lot::Mutex::new(0),
306            touched_graphs: parking_lot::Mutex::new(Vec::new()),
307            active_streams: AtomicUsize::new(0),
308            #[cfg(feature = "metrics")]
309            metrics: None,
310            #[cfg(feature = "metrics")]
311            tx_start_time: parking_lot::Mutex::new(None),
312            projections: cfg.projections,
313        }
314    }
315
316    /// Overrides the graph store and write store used by the query engine.
317    ///
318    /// Used by the layered store integration: the session's `store` field is
319    /// the overlay `LpgStore` (for MVCC), but reads and writes should route
320    /// through the `LayeredStore` (which merges base + overlay).
321    #[cfg(all(feature = "compact-store", feature = "lpg"))]
322    pub(crate) fn override_stores(
323        &mut self,
324        read_store: Arc<dyn GraphStoreSearch>,
325        write_store: Option<Arc<dyn GraphStoreMut>>,
326    ) {
327        self.graph_store = read_store;
328        self.graph_store_mut = write_store;
329    }
330
331    /// Sets the WAL for this session (shared with the database).
332    ///
333    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
334    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
335    #[cfg(all(feature = "wal", feature = "lpg"))]
336    pub(crate) fn set_wal(
337        &mut self,
338        wal: Arc<grafeo_storage::wal::LpgWal>,
339        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
340    ) {
341        // Wrap the graph store so query-engine mutations are WAL-logged
342        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
343            Arc::clone(&self.store),
344            Arc::clone(&wal),
345            Arc::clone(&wal_graph_context),
346        ));
347        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
348        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
349        self.wal = Some(wal);
350        self.wal_graph_context = Some(wal_graph_context);
351    }
352
353    /// Sets the CDC log for this session (shared with the database).
354    ///
355    /// Wraps the current write store with a `CdcGraphStore` decorator so
356    /// that all session mutations (INSERT, SET, DELETE via query execution)
357    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
358    /// and discarded on rollback.
359    #[cfg(feature = "cdc")]
360    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
361        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
362        // The read store (self.graph_store) is left unchanged for zero read overhead.
363        if let Some(ref write_store) = self.graph_store_mut {
364            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
365                Arc::clone(write_store),
366                Arc::clone(&cdc_log),
367            ));
368            self.cdc_pending_events = Some(cdc_store.pending_events());
369            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
370        }
371        self.cdc_log = cdc_log;
372    }
373
374    /// Sets the metrics registry for this session (shared with the database).
375    #[cfg(feature = "metrics")]
376    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
377        self.metrics = Some(metrics);
378    }
379
380    /// Creates a session backed by an external graph store.
381    ///
382    /// The external store handles all data operations. Transaction management
383    /// (begin/commit/rollback) is not supported for external stores.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if the internal arena allocation fails (out of memory).
388    pub(crate) fn with_external_store(
389        read_store: Arc<dyn GraphStoreSearch>,
390        write_store: Option<Arc<dyn GraphStoreMut>>,
391        cfg: SessionConfig,
392    ) -> Result<Self> {
393        Ok(Self {
394            #[cfg(feature = "lpg")]
395            store: Arc::new(LpgStore::new()?),
396            #[cfg(feature = "lpg")]
397            lpg_backend: LpgBackend::Placeholder,
398            graph_store: read_store,
399            graph_store_mut: write_store,
400            catalog: cfg.catalog,
401            #[cfg(feature = "triple-store")]
402            rdf_store: Arc::new(RdfStore::new()),
403            transaction_manager: cfg.transaction_manager,
404            query_cache: cfg.query_cache,
405            current_transaction: parking_lot::Mutex::new(None),
406            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
407            db_read_only: cfg.read_only,
408            identity: cfg.identity,
409            auto_commit: true,
410            adaptive_config: cfg.adaptive_config,
411            factorized_execution: cfg.factorized_execution,
412            graph_model: cfg.graph_model,
413            query_timeout: cfg.query_timeout,
414            max_property_size: cfg.max_property_size,
415            buffer_manager: cfg.buffer_manager,
416            commit_counter: cfg.commit_counter,
417            gc_interval: cfg.gc_interval,
418            transaction_start_node_count: AtomicUsize::new(0),
419            transaction_start_edge_count: AtomicUsize::new(0),
420            #[cfg(feature = "wal")]
421            wal: None,
422            #[cfg(feature = "wal")]
423            wal_graph_context: None,
424            #[cfg(feature = "cdc")]
425            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
426            #[cfg(feature = "cdc")]
427            cdc_pending_events: None,
428            current_graph: parking_lot::Mutex::new(None),
429            current_schema: parking_lot::Mutex::new(None),
430            time_zone: parking_lot::Mutex::new(None),
431            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
432            viewing_epoch_override: parking_lot::Mutex::new(None),
433            savepoints: parking_lot::Mutex::new(Vec::new()),
434            transaction_nesting_depth: parking_lot::Mutex::new(0),
435            touched_graphs: parking_lot::Mutex::new(Vec::new()),
436            active_streams: AtomicUsize::new(0),
437            #[cfg(feature = "metrics")]
438            metrics: None,
439            #[cfg(feature = "metrics")]
440            tx_start_time: parking_lot::Mutex::new(None),
441            #[cfg(feature = "lpg")]
442            projections: cfg.projections,
443        })
444    }
445
446    /// Returns the graph model this session operates on.
447    #[must_use]
448    pub fn graph_model(&self) -> GraphModel {
449        self.graph_model
450    }
451
452    /// Returns the identity bound to this session.
453    #[must_use]
454    pub fn identity(&self) -> &crate::auth::Identity {
455        &self.identity
456    }
457
458    // === Session State Management ===
459
460    /// Sets the current graph for this session (USE GRAPH).
461    pub fn use_graph(&self, name: &str) {
462        *self.current_graph.lock() = Some(name.to_string());
463        self.track_graph_touch();
464    }
465
466    /// Returns the current graph name, if any.
467    #[must_use]
468    pub fn current_graph(&self) -> Option<String> {
469        self.current_graph.lock().clone()
470    }
471
472    /// Sets the current schema for this session (SESSION SET SCHEMA).
473    ///
474    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
475    pub fn set_schema(&self, name: &str) {
476        *self.current_schema.lock() = Some(name.to_string());
477        self.track_graph_touch();
478    }
479
480    /// Returns the current schema name, if any.
481    ///
482    /// `None` means "not set", which resolves to the default schema.
483    #[must_use]
484    pub fn current_schema(&self) -> Option<String> {
485        self.current_schema.lock().clone()
486    }
487
488    /// Computes the effective storage key for a graph, accounting for schema context.
489    ///
490    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
491    /// Uses `/` as separator since it is invalid in GQL identifiers.
492    fn effective_graph_key(&self, graph_name: &str) -> String {
493        let schema = self.current_schema.lock().clone();
494        match schema {
495            Some(s) => format!("{s}/{graph_name}"),
496            None => graph_name.to_string(),
497        }
498    }
499
500    /// Computes the effective storage key for a type, accounting for schema context.
501    ///
502    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
503    fn effective_type_key(&self, type_name: &str) -> String {
504        let schema = self.current_schema.lock().clone();
505        match schema {
506            Some(s) => format!("{s}/{type_name}"),
507            None => type_name.to_string(),
508        }
509    }
510
511    /// Returns the effective storage key for the current graph, accounting for schema.
512    ///
513    /// Combines `current_schema` and `current_graph` into a flat lookup key.
514    fn active_graph_storage_key(&self) -> Option<String> {
515        let graph = self.current_graph.lock().clone();
516        let schema = self.current_schema.lock().clone();
517        match (&schema, &graph) {
518            (None, None) => None,
519            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
520            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
521            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
522                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
523            }
524            (None, Some(name)) => Some(name.clone()),
525            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
526        }
527    }
528
529    /// Returns the graph store for the currently active graph.
530    ///
531    /// If `current_graph` is `None` or `"default"`, returns the session's
532    /// default `graph_store` (already WAL-wrapped for the default graph).
533    /// Otherwise looks up the named graph in the root store and wraps it
534    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
535    /// graph context.
536    fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
537        let key = self.active_graph_storage_key();
538        match key {
539            None => Arc::clone(&self.graph_store),
540            #[cfg(feature = "lpg")]
541            Some(ref name) => match self.store.graph(name) {
542                Some(named_store) => {
543                    #[cfg(feature = "wal")]
544                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
545                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
546                            named_store,
547                            Arc::clone(wal),
548                            name.clone(),
549                            Arc::clone(ctx),
550                        )) as Arc<dyn GraphStoreSearch>;
551                    }
552                    named_store as Arc<dyn GraphStoreSearch>
553                }
554                None => Arc::clone(&self.graph_store),
555            },
556            #[cfg(not(feature = "lpg"))]
557            Some(_) => Arc::clone(&self.graph_store),
558        }
559    }
560
561    /// Returns the writable store for the active graph, if available.
562    ///
563    /// Returns `None` for read-only databases. For named graphs, wraps
564    /// the store with WAL logging when durability is enabled.
565    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
566        let key = self.active_graph_storage_key();
567        match key {
568            None => self.graph_store_mut.as_ref().map(Arc::clone),
569            #[cfg(feature = "lpg")]
570            Some(ref name) => match self.store.graph(name) {
571                Some(named_store) => {
572                    let mut store: Arc<dyn GraphStoreMut> = named_store;
573
574                    #[cfg(feature = "wal")]
575                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
576                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
577                            // WAL needs Arc<LpgStore>, get it fresh
578                            self.store
579                                .graph(name)
580                                .unwrap_or_else(|| Arc::clone(&self.store)),
581                            Arc::clone(wal),
582                            name.clone(),
583                            Arc::clone(ctx),
584                        ));
585                    }
586
587                    #[cfg(feature = "cdc")]
588                    if let Some(ref pending) = self.cdc_pending_events {
589                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
590                            store,
591                            Arc::clone(&self.cdc_log),
592                            Arc::clone(pending),
593                        ));
594                    }
595
596                    Some(store)
597                }
598                None => self.graph_store_mut.as_ref().map(Arc::clone),
599            },
600            #[cfg(not(feature = "lpg"))]
601            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
602        }
603    }
604
605    /// Returns the concrete `LpgStore` for the currently active graph.
606    ///
607    /// Used by direct CRUD methods that need the concrete store type
608    /// for versioned operations.
609    #[cfg(feature = "lpg")]
610    fn active_lpg_store(&self) -> Arc<LpgStore> {
611        let key = self.active_graph_storage_key();
612        match key {
613            None => Arc::clone(&self.store),
614            Some(ref name) => self
615                .store
616                .graph(name)
617                .unwrap_or_else(|| Arc::clone(&self.store)),
618        }
619    }
620
621    /// Resolves a graph name to a concrete `LpgStore`.
622    /// `None` and `"default"` resolve to the session's root store.
623    #[cfg(feature = "lpg")]
624    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
625        match graph_name {
626            None => Arc::clone(&self.store),
627            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
628            Some(name) => self
629                .store
630                .graph(name)
631                .unwrap_or_else(|| Arc::clone(&self.store)),
632        }
633    }
634
635    /// Records the current graph as "touched" if a transaction is active.
636    ///
637    /// Uses the full storage key (schema/graph) so that commit/rollback
638    /// can resolve the correct store via `resolve_store`. Called from
639    /// every setter that can change the active key (`use_graph`,
640    /// `set_schema`, `reset_*`) so mid-transaction context switches are
641    /// always captured; callers that mutate the active key via those
642    /// setters do not need to invoke this directly.
643    fn track_graph_touch(&self) {
644        if self.current_transaction.lock().is_some() {
645            let key = self.active_graph_storage_key();
646            let mut touched = self.touched_graphs.lock();
647            if !touched.contains(&key) {
648                touched.push(key);
649            }
650        }
651    }
652
653    /// Sets the session time zone.
654    pub fn set_time_zone(&self, tz: &str) {
655        *self.time_zone.lock() = Some(tz.to_string());
656    }
657
658    /// Returns the session time zone, if set.
659    #[must_use]
660    pub fn time_zone(&self) -> Option<String> {
661        self.time_zone.lock().clone()
662    }
663
664    /// Sets a session parameter.
665    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
666        self.session_params.lock().insert(key.to_string(), value);
667    }
668
669    /// Gets a session parameter by cloning it.
670    #[must_use]
671    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
672        self.session_params.lock().get(key).cloned()
673    }
674
675    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
676    pub fn reset_session(&self) {
677        *self.current_schema.lock() = None;
678        *self.current_graph.lock() = None;
679        *self.time_zone.lock() = None;
680        self.session_params.lock().clear();
681        *self.viewing_epoch_override.lock() = None;
682        self.track_graph_touch();
683    }
684
685    /// Resets only the session schema (Section 7.2 GR1).
686    pub fn reset_schema(&self) {
687        *self.current_schema.lock() = None;
688        self.track_graph_touch();
689    }
690
691    /// Resets only the session graph (Section 7.2 GR2).
692    pub fn reset_graph(&self) {
693        *self.current_graph.lock() = None;
694        self.track_graph_touch();
695    }
696
697    /// Resets only the session time zone (Section 7.2 GR3).
698    pub fn reset_time_zone(&self) {
699        *self.time_zone.lock() = None;
700    }
701
702    /// Resets only session parameters (Section 7.2 GR4).
703    pub fn reset_parameters(&self) {
704        self.session_params.lock().clear();
705    }
706
707    // --- Time-travel API ---
708
709    /// Sets a viewing epoch override for time-travel queries.
710    ///
711    /// While set, all queries on this session see the database as it existed
712    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
713    /// to return to normal behavior.
714    pub fn set_viewing_epoch(&self, epoch: EpochId) {
715        *self.viewing_epoch_override.lock() = Some(epoch);
716    }
717
718    /// Clears the viewing epoch override, returning to normal behavior.
719    pub fn clear_viewing_epoch(&self) {
720        *self.viewing_epoch_override.lock() = None;
721    }
722
723    /// Returns the current viewing epoch override, if any.
724    #[must_use]
725    pub fn viewing_epoch(&self) -> Option<EpochId> {
726        *self.viewing_epoch_override.lock()
727    }
728
729    /// Returns all versions of a node with their creation/deletion epochs.
730    ///
731    /// Properties and labels reflect the current state (not versioned per-epoch).
732    #[cfg(feature = "lpg")]
733    #[must_use]
734    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
735        self.active_lpg_store().get_node_history(id)
736    }
737
738    /// Returns all versions of an edge with their creation/deletion epochs.
739    ///
740    /// Properties reflect the current state (not versioned per-epoch).
741    #[cfg(feature = "lpg")]
742    #[must_use]
743    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
744        self.active_lpg_store().get_edge_history(id)
745    }
746
747    /// Checks that the session's graph model supports LPG operations.
748    fn require_lpg(&self, language: &str) -> Result<()> {
749        if self.graph_model == GraphModel::Rdf {
750            return Err(grafeo_common::utils::error::Error::Internal(format!(
751                "This is an RDF database. {language} queries require an LPG database."
752            )));
753        }
754        Ok(())
755    }
756
757    /// Checks that the session's identity is permitted to execute the given
758    /// statement kind. Returns an error if the role is insufficient.
759    ///
760    /// Short-circuits for Admin identities (the common case for embedded use
761    /// and benchmarks) to avoid any overhead on the hot path.
762    #[inline]
763    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
764        // Fast path: Admin can do everything
765        if self.identity.can_admin() {
766            return Ok(());
767        }
768        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
769            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
770                grafeo_common::utils::error::QueryErrorKind::Semantic,
771                denied.to_string(),
772            ))
773        })
774    }
775
776    /// Executes a session or transaction command, returning an empty result.
777    #[cfg(feature = "gql")]
778    fn execute_session_command(
779        &self,
780        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
781    ) -> Result<QueryResult> {
782        use grafeo_adapters::query::gql::ast::SessionCommand;
783        #[cfg(feature = "lpg")]
784        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
785        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
786
787        // Check role-based permission for graph management commands
788        match &cmd {
789            SessionCommand::CreateGraph { .. }
790            | SessionCommand::DropGraph { .. }
791            | SessionCommand::CreateProjection { .. }
792            | SessionCommand::DropProjection { .. } => {
793                self.require_permission(crate::auth::StatementKind::Write)?;
794            }
795            _ => {} // Session state + transaction control: always allowed
796        }
797
798        // Check per-graph grants for graph-scoped commands
799        if self.identity.has_grants() {
800            match &cmd {
801                SessionCommand::CreateGraph { name, .. }
802                | SessionCommand::DropGraph { name, .. }
803                    if !self
804                        .identity
805                        .can_access_graph(name, crate::auth::Role::ReadWrite) =>
806                {
807                    return Err(Error::Query(QueryError::new(
808                        QueryErrorKind::Semantic,
809                        format!(
810                            "permission denied: no grant for graph '{name}' (user: {})",
811                            self.identity.user_id()
812                        ),
813                    )));
814                }
815                _ => {}
816            }
817        }
818
819        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
820        if *self.read_only_tx.lock() {
821            match &cmd {
822                SessionCommand::CreateGraph { .. }
823                | SessionCommand::DropGraph { .. }
824                | SessionCommand::CreateProjection { .. }
825                | SessionCommand::DropProjection { .. } => {
826                    return Err(Error::Transaction(
827                        grafeo_common::utils::error::TransactionError::ReadOnly,
828                    ));
829                }
830                _ => {} // Session state + transaction control allowed
831            }
832        }
833
834        match cmd {
835            #[cfg(feature = "lpg")]
836            SessionCommand::CreateGraph {
837                name,
838                if_not_exists,
839                typed,
840                like_graph,
841                copy_of,
842                open: _,
843            } => {
844                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
845                if name.contains('/') {
846                    return Err(Error::Query(QueryError::new(
847                        QueryErrorKind::Semantic,
848                        format!(
849                            "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
850                        ),
851                    )));
852                }
853                let storage_key = self.effective_graph_key(&name);
854
855                // Validate source graph exists for LIKE / AS COPY OF
856                if let Some(ref src) = like_graph {
857                    let src_key = self.effective_graph_key(src);
858                    if self.store.graph(&src_key).is_none() {
859                        return Err(Error::Query(QueryError::new(
860                            QueryErrorKind::Semantic,
861                            format!("Source graph '{src}' does not exist"),
862                        )));
863                    }
864                }
865                if let Some(ref src) = copy_of {
866                    let src_key = self.effective_graph_key(src);
867                    if self.store.graph(&src_key).is_none() {
868                        return Err(Error::Query(QueryError::new(
869                            QueryErrorKind::Semantic,
870                            format!("Source graph '{src}' does not exist"),
871                        )));
872                    }
873                }
874
875                let created = self
876                    .store
877                    .create_graph(&storage_key)
878                    .map_err(|e| Error::Internal(e.to_string()))?;
879                if !created && !if_not_exists {
880                    return Err(Error::Query(QueryError::new(
881                        QueryErrorKind::Semantic,
882                        format!("Graph '{name}' already exists"),
883                    )));
884                }
885                if created {
886                    #[cfg(feature = "wal")]
887                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
888                        name: storage_key.clone(),
889                    });
890                }
891
892                // AS COPY OF: copy data from source graph
893                if let Some(ref src) = copy_of {
894                    let src_key = self.effective_graph_key(src);
895                    self.store
896                        .copy_graph(Some(&src_key), Some(&storage_key))
897                        .map_err(|e| Error::Internal(e.to_string()))?;
898                }
899
900                // Bind to graph type if specified.
901                // If the parser produced a '/' in the name it is already a qualified
902                // "schema/type" key; otherwise resolve against the current schema.
903                if let Some(type_name) = typed
904                    && let Err(e) = self.catalog.bind_graph_type(
905                        &storage_key,
906                        if type_name.contains('/') {
907                            type_name.clone()
908                        } else {
909                            self.effective_type_key(&type_name)
910                        },
911                    )
912                {
913                    return Err(Error::Query(QueryError::new(
914                        QueryErrorKind::Semantic,
915                        e.to_string(),
916                    )));
917                }
918
919                // LIKE: copy graph type binding from source
920                if let Some(ref src) = like_graph {
921                    let src_key = self.effective_graph_key(src);
922                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
923                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
924                    }
925                }
926
927                Ok(QueryResult::empty())
928            }
929            #[cfg(feature = "lpg")]
930            SessionCommand::DropGraph { name, if_exists } => {
931                let storage_key = self.effective_graph_key(&name);
932                let dropped = self.store.drop_graph(&storage_key);
933                if !dropped && !if_exists {
934                    return Err(Error::Query(QueryError::new(
935                        QueryErrorKind::Semantic,
936                        format!("Graph '{name}' does not exist"),
937                    )));
938                }
939                if dropped {
940                    #[cfg(feature = "wal")]
941                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
942                        name: storage_key.clone(),
943                    });
944                    // If this session was using the dropped graph, reset to default
945                    let mut current = self.current_graph.lock();
946                    if current
947                        .as_deref()
948                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
949                    {
950                        *current = None;
951                    }
952                }
953                Ok(QueryResult::empty())
954            }
955            #[cfg(feature = "lpg")]
956            SessionCommand::UseGraph(name) => {
957                // Check per-graph grant before switching
958                if self.identity.has_grants()
959                    && !name.eq_ignore_ascii_case("default")
960                    && !self
961                        .identity
962                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
963                {
964                    return Err(Error::Query(QueryError::new(
965                        QueryErrorKind::Semantic,
966                        format!(
967                            "permission denied: no grant for graph '{name}' (user: {})",
968                            self.identity.user_id()
969                        ),
970                    )));
971                }
972                // Verify graph exists (resolve within current schema)
973                let effective_key = self.effective_graph_key(&name);
974                if !name.eq_ignore_ascii_case("default")
975                    && self.store.graph(&effective_key).is_none()
976                {
977                    return Err(Error::Query(QueryError::new(
978                        QueryErrorKind::Semantic,
979                        format!("Graph '{name}' does not exist"),
980                    )));
981                }
982                self.use_graph(&name);
983                Ok(QueryResult::empty())
984            }
985            #[cfg(feature = "lpg")]
986            SessionCommand::SessionSetGraph(name) => {
987                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
988                // Check per-graph grant before switching (same as USE GRAPH)
989                if self.identity.has_grants()
990                    && !name.eq_ignore_ascii_case("default")
991                    && !self
992                        .identity
993                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
994                {
995                    return Err(Error::Query(QueryError::new(
996                        QueryErrorKind::Semantic,
997                        format!(
998                            "permission denied: no grant for graph '{name}' (user: {})",
999                            self.identity.user_id()
1000                        ),
1001                    )));
1002                }
1003                let effective_key = self.effective_graph_key(&name);
1004                if !name.eq_ignore_ascii_case("default")
1005                    && self.store.graph(&effective_key).is_none()
1006                {
1007                    return Err(Error::Query(QueryError::new(
1008                        QueryErrorKind::Semantic,
1009                        format!("Graph '{name}' does not exist"),
1010                    )));
1011                }
1012                self.use_graph(&name);
1013                Ok(QueryResult::empty())
1014            }
1015            SessionCommand::SessionSetSchema(name) => {
1016                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
1017                if !self.catalog.schema_exists(&name) {
1018                    return Err(Error::Query(QueryError::new(
1019                        QueryErrorKind::Semantic,
1020                        format!("Schema '{name}' does not exist"),
1021                    )));
1022                }
1023                self.set_schema(&name);
1024                Ok(QueryResult::empty())
1025            }
1026            SessionCommand::SessionSetTimeZone(tz) => {
1027                self.set_time_zone(&tz);
1028                Ok(QueryResult::empty())
1029            }
1030            #[cfg(feature = "gql")]
1031            SessionCommand::SessionSetParameter(key, expr) => {
1032                if key.eq_ignore_ascii_case("viewing_epoch") {
1033                    match Self::eval_integer_literal(&expr) {
1034                        Some(n) if n >= 0 => {
1035                            // reason: guard ensures n >= 0
1036                            #[allow(clippy::cast_sign_loss)]
1037                            let epoch = n as u64;
1038                            self.set_viewing_epoch(EpochId::new(epoch));
1039                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1040                        }
1041                        _ => Err(Error::Query(QueryError::new(
1042                            QueryErrorKind::Semantic,
1043                            "viewing_epoch must be a non-negative integer literal",
1044                        ))),
1045                    }
1046                } else {
1047                    // For now, store parameter name with Null value.
1048                    // Full expression evaluation would require building and executing a plan.
1049                    self.set_parameter(&key, Value::Null);
1050                    Ok(QueryResult::empty())
1051                }
1052            }
1053            SessionCommand::SessionReset(target) => {
1054                use grafeo_adapters::query::gql::ast::SessionResetTarget;
1055                match target {
1056                    SessionResetTarget::All => self.reset_session(),
1057                    SessionResetTarget::Schema => self.reset_schema(),
1058                    SessionResetTarget::Graph => self.reset_graph(),
1059                    SessionResetTarget::TimeZone => self.reset_time_zone(),
1060                    SessionResetTarget::Parameters => self.reset_parameters(),
1061                }
1062                Ok(QueryResult::empty())
1063            }
1064            SessionCommand::SessionClose => {
1065                self.reset_session();
1066                Ok(QueryResult::empty())
1067            }
1068            #[cfg(feature = "lpg")]
1069            SessionCommand::StartTransaction {
1070                read_only,
1071                isolation_level,
1072            } => {
1073                let engine_level = isolation_level.map(|l| match l {
1074                    TransactionIsolationLevel::ReadCommitted => {
1075                        crate::transaction::IsolationLevel::ReadCommitted
1076                    }
1077                    TransactionIsolationLevel::SnapshotIsolation => {
1078                        crate::transaction::IsolationLevel::SnapshotIsolation
1079                    }
1080                    TransactionIsolationLevel::Serializable => {
1081                        crate::transaction::IsolationLevel::Serializable
1082                    }
1083                });
1084                self.begin_transaction_inner(read_only, engine_level)?;
1085                Ok(QueryResult::status("Transaction started"))
1086            }
1087            #[cfg(feature = "lpg")]
1088            SessionCommand::Commit => {
1089                self.commit_inner()?;
1090                Ok(QueryResult::status("Transaction committed"))
1091            }
1092            #[cfg(feature = "lpg")]
1093            SessionCommand::Rollback => {
1094                self.rollback_inner()?;
1095                Ok(QueryResult::status("Transaction rolled back"))
1096            }
1097            #[cfg(feature = "lpg")]
1098            SessionCommand::Savepoint(name) => {
1099                self.savepoint(&name)?;
1100                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1101            }
1102            #[cfg(feature = "lpg")]
1103            SessionCommand::RollbackToSavepoint(name) => {
1104                self.rollback_to_savepoint(&name)?;
1105                Ok(QueryResult::status(format!(
1106                    "Rolled back to savepoint '{name}'"
1107                )))
1108            }
1109            #[cfg(feature = "lpg")]
1110            SessionCommand::ReleaseSavepoint(name) => {
1111                self.release_savepoint(&name)?;
1112                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1113            }
1114            #[cfg(feature = "lpg")]
1115            SessionCommand::CreateProjection {
1116                name,
1117                node_labels,
1118                edge_types,
1119            } => {
1120                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1121                use std::collections::hash_map::Entry;
1122
1123                let spec = ProjectionSpec::new()
1124                    .with_node_labels(node_labels)
1125                    .with_edge_types(edge_types);
1126
1127                let store = self.active_store();
1128                let projection = Arc::new(GraphProjection::new(store, spec));
1129                let mut projections = self.projections.write();
1130                match projections.entry(name.clone()) {
1131                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1132                        QueryErrorKind::Semantic,
1133                        format!("Projection '{name}' already exists"),
1134                    ))),
1135                    Entry::Vacant(e) => {
1136                        e.insert(projection);
1137                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1138                    }
1139                }
1140            }
1141            #[cfg(feature = "lpg")]
1142            SessionCommand::DropProjection { name } => {
1143                let removed = self.projections.write().remove(&name).is_some();
1144                if !removed {
1145                    return Err(Error::Query(QueryError::new(
1146                        QueryErrorKind::Semantic,
1147                        format!("Projection '{name}' does not exist"),
1148                    )));
1149                }
1150                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1151            }
1152            #[cfg(feature = "lpg")]
1153            SessionCommand::ShowProjections => {
1154                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1155                names.sort();
1156                let rows: Vec<Vec<Value>> =
1157                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1158                Ok(QueryResult {
1159                    columns: vec!["name".to_string()],
1160                    column_types: Vec::new(),
1161                    rows,
1162                    ..QueryResult::empty()
1163                })
1164            }
1165            #[cfg(not(feature = "lpg"))]
1166            _ => Err(grafeo_common::utils::error::Error::Internal(
1167                "This command requires the `lpg` feature".to_string(),
1168            )),
1169        }
1170    }
1171
1172    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1173    #[cfg(feature = "wal")]
1174    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1175        if let Some(ref wal) = self.wal
1176            && let Err(e) = wal.log(record)
1177        {
1178            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1179        }
1180    }
1181
1182    /// Executes a schema DDL command, returning a status result.
1183    #[cfg(all(feature = "lpg", feature = "gql"))]
1184    fn execute_schema_command(
1185        &self,
1186        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1187    ) -> Result<QueryResult> {
1188        use crate::catalog::{
1189            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1190        };
1191        use grafeo_adapters::query::gql::ast::SchemaStatement;
1192        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1193        #[cfg(feature = "wal")]
1194        use grafeo_storage::wal::WalRecord;
1195
1196        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1197        macro_rules! wal_log {
1198            ($self:expr, $record:expr) => {
1199                #[cfg(feature = "wal")]
1200                $self.log_schema_wal(&$record);
1201            };
1202        }
1203
1204        let result = match cmd {
1205            SchemaStatement::CreateNodeType(stmt) => {
1206                let effective_name = self.effective_type_key(&stmt.name);
1207                #[cfg(feature = "wal")]
1208                let props_for_wal: Vec<(String, String, bool)> = stmt
1209                    .properties
1210                    .iter()
1211                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1212                    .collect();
1213                let def = NodeTypeDefinition {
1214                    name: effective_name.clone(),
1215                    properties: stmt
1216                        .properties
1217                        .iter()
1218                        .map(|p| TypedProperty {
1219                            name: p.name.clone(),
1220                            data_type: PropertyDataType::from_type_name(&p.data_type),
1221                            nullable: p.nullable,
1222                            default_value: p
1223                                .default_value
1224                                .as_ref()
1225                                .map(|s| parse_default_literal(s)),
1226                        })
1227                        .collect(),
1228                    constraints: Vec::new(),
1229                    parent_types: stmt.parent_types.clone(),
1230                };
1231                let result = if stmt.or_replace {
1232                    let _ = self.catalog.drop_node_type(&effective_name);
1233                    self.catalog.register_node_type(def)
1234                } else {
1235                    self.catalog.register_node_type(def)
1236                };
1237                match result {
1238                    Ok(()) => {
1239                        wal_log!(
1240                            self,
1241                            WalRecord::CreateNodeType {
1242                                name: effective_name.clone(),
1243                                properties: props_for_wal,
1244                                constraints: Vec::new(),
1245                            }
1246                        );
1247                        Ok(QueryResult::status(format!(
1248                            "Created node type '{}'",
1249                            stmt.name
1250                        )))
1251                    }
1252                    Err(e) if stmt.if_not_exists => {
1253                        let _ = e;
1254                        Ok(QueryResult::status("No change"))
1255                    }
1256                    Err(e) => Err(Error::Query(QueryError::new(
1257                        QueryErrorKind::Semantic,
1258                        e.to_string(),
1259                    ))),
1260                }
1261            }
1262            SchemaStatement::CreateEdgeType(stmt) => {
1263                let effective_name = self.effective_type_key(&stmt.name);
1264                #[cfg(feature = "wal")]
1265                let props_for_wal: Vec<(String, String, bool)> = stmt
1266                    .properties
1267                    .iter()
1268                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1269                    .collect();
1270                let def = EdgeTypeDefinition {
1271                    name: effective_name.clone(),
1272                    properties: stmt
1273                        .properties
1274                        .iter()
1275                        .map(|p| TypedProperty {
1276                            name: p.name.clone(),
1277                            data_type: PropertyDataType::from_type_name(&p.data_type),
1278                            nullable: p.nullable,
1279                            default_value: p
1280                                .default_value
1281                                .as_ref()
1282                                .map(|s| parse_default_literal(s)),
1283                        })
1284                        .collect(),
1285                    constraints: Vec::new(),
1286                    source_node_types: stmt.source_node_types.clone(),
1287                    target_node_types: stmt.target_node_types.clone(),
1288                };
1289                let result = if stmt.or_replace {
1290                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1291                    self.catalog.register_edge_type_def(def)
1292                } else {
1293                    self.catalog.register_edge_type_def(def)
1294                };
1295                match result {
1296                    Ok(()) => {
1297                        wal_log!(
1298                            self,
1299                            WalRecord::CreateEdgeType {
1300                                name: effective_name.clone(),
1301                                properties: props_for_wal,
1302                                constraints: Vec::new(),
1303                            }
1304                        );
1305                        Ok(QueryResult::status(format!(
1306                            "Created edge type '{}'",
1307                            stmt.name
1308                        )))
1309                    }
1310                    Err(e) if stmt.if_not_exists => {
1311                        let _ = e;
1312                        Ok(QueryResult::status("No change"))
1313                    }
1314                    Err(e) => Err(Error::Query(QueryError::new(
1315                        QueryErrorKind::Semantic,
1316                        e.to_string(),
1317                    ))),
1318                }
1319            }
1320            SchemaStatement::CreateVectorIndex(stmt) => {
1321                Self::create_vector_index_on_store(
1322                    &self.active_lpg_store(),
1323                    &stmt.node_label,
1324                    &stmt.property,
1325                    stmt.dimensions,
1326                    stmt.metric.as_deref(),
1327                )?;
1328                wal_log!(
1329                    self,
1330                    WalRecord::CreateIndex {
1331                        name: stmt.name.clone(),
1332                        label: stmt.node_label.clone(),
1333                        property: stmt.property.clone(),
1334                        index_type: "vector".to_string(),
1335                    }
1336                );
1337                Ok(QueryResult::status(format!(
1338                    "Created vector index '{}'",
1339                    stmt.name
1340                )))
1341            }
1342            SchemaStatement::DropNodeType { name, if_exists } => {
1343                let effective_name = self.effective_type_key(&name);
1344                match self.catalog.drop_node_type(&effective_name) {
1345                    Ok(()) => {
1346                        wal_log!(
1347                            self,
1348                            WalRecord::DropNodeType {
1349                                name: effective_name
1350                            }
1351                        );
1352                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1353                    }
1354                    Err(e) if if_exists => {
1355                        let _ = e;
1356                        Ok(QueryResult::status("No change"))
1357                    }
1358                    Err(e) => Err(Error::Query(QueryError::new(
1359                        QueryErrorKind::Semantic,
1360                        e.to_string(),
1361                    ))),
1362                }
1363            }
1364            SchemaStatement::DropEdgeType { name, if_exists } => {
1365                let effective_name = self.effective_type_key(&name);
1366                match self.catalog.drop_edge_type_def(&effective_name) {
1367                    Ok(()) => {
1368                        wal_log!(
1369                            self,
1370                            WalRecord::DropEdgeType {
1371                                name: effective_name
1372                            }
1373                        );
1374                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1375                    }
1376                    Err(e) if if_exists => {
1377                        let _ = e;
1378                        Ok(QueryResult::status("No change"))
1379                    }
1380                    Err(e) => Err(Error::Query(QueryError::new(
1381                        QueryErrorKind::Semantic,
1382                        e.to_string(),
1383                    ))),
1384                }
1385            }
1386            SchemaStatement::CreateIndex(stmt) => {
1387                use crate::catalog::IndexType as CatalogIndexType;
1388                use grafeo_adapters::query::gql::ast::IndexKind;
1389                let active = self.active_lpg_store();
1390                let index_type_str = match stmt.index_kind {
1391                    IndexKind::Property => "property",
1392                    IndexKind::BTree => "btree",
1393                    IndexKind::Text => "text",
1394                    IndexKind::Vector => "vector",
1395                };
1396                match stmt.index_kind {
1397                    IndexKind::Property | IndexKind::BTree => {
1398                        for prop in &stmt.properties {
1399                            active.create_property_index(prop);
1400                        }
1401                    }
1402                    IndexKind::Text => {
1403                        for prop in &stmt.properties {
1404                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1405                        }
1406                    }
1407                    IndexKind::Vector => {
1408                        for prop in &stmt.properties {
1409                            Self::create_vector_index_on_store(
1410                                &active,
1411                                &stmt.label,
1412                                prop,
1413                                stmt.options.dimensions,
1414                                stmt.options.metric.as_deref(),
1415                            )?;
1416                        }
1417                    }
1418                }
1419                // Register each property index in the catalog for SHOW INDEXES
1420                // and DROP INDEX name-based lookup.
1421                let catalog_index_type = match stmt.index_kind {
1422                    IndexKind::Property => CatalogIndexType::Hash,
1423                    IndexKind::BTree => CatalogIndexType::BTree,
1424                    IndexKind::Text => CatalogIndexType::FullText,
1425                    IndexKind::Vector => CatalogIndexType::Hash,
1426                };
1427                let label_id = self.catalog.get_or_create_label(&stmt.label);
1428                for prop in &stmt.properties {
1429                    let prop_id = self.catalog.get_or_create_property_key(prop);
1430                    self.catalog
1431                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1432                }
1433                #[cfg(feature = "wal")]
1434                for prop in &stmt.properties {
1435                    wal_log!(
1436                        self,
1437                        WalRecord::CreateIndex {
1438                            name: stmt.name.clone(),
1439                            label: stmt.label.clone(),
1440                            property: prop.clone(),
1441                            index_type: index_type_str.to_string(),
1442                        }
1443                    );
1444                }
1445                Ok(QueryResult::status(format!(
1446                    "Created {} index '{}'",
1447                    index_type_str, stmt.name
1448                )))
1449            }
1450            SchemaStatement::DropIndex { name, if_exists } => {
1451                // Look up the index by name in the catalog to find the
1452                // underlying property, then drop from both catalog and store.
1453                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1454                    let def = self.catalog.get_index(index_id);
1455                    self.catalog.drop_index(index_id);
1456                    if let Some(def) = def
1457                        && let Some(prop_name) =
1458                            self.catalog.get_property_key_name(def.property_key)
1459                    {
1460                        self.active_lpg_store().drop_property_index(&prop_name);
1461                    }
1462                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1463                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1464                } else if if_exists {
1465                    Ok(QueryResult::status("No change".to_string()))
1466                } else {
1467                    Err(Error::Query(QueryError::new(
1468                        QueryErrorKind::Semantic,
1469                        format!("Index '{name}' does not exist"),
1470                    )))
1471                }
1472            }
1473            SchemaStatement::CreateConstraint(stmt) => {
1474                use crate::catalog::TypeConstraint;
1475                use grafeo_adapters::query::gql::ast::ConstraintKind;
1476                let kind_str = match stmt.constraint_kind {
1477                    ConstraintKind::Unique => "unique",
1478                    ConstraintKind::NodeKey => "node_key",
1479                    ConstraintKind::NotNull => "not_null",
1480                    ConstraintKind::Exists => "exists",
1481                };
1482                let constraint_name = stmt
1483                    .name
1484                    .clone()
1485                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1486
1487                // Register constraint in catalog type definitions
1488                match stmt.constraint_kind {
1489                    ConstraintKind::Unique => {
1490                        for prop in &stmt.properties {
1491                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1492                            let prop_id = self.catalog.get_or_create_property_key(prop);
1493                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1494                        }
1495                        let _ = self.catalog.add_constraint_to_type(
1496                            &stmt.label,
1497                            TypeConstraint::Unique(stmt.properties.clone()),
1498                        );
1499                    }
1500                    ConstraintKind::NodeKey => {
1501                        for prop in &stmt.properties {
1502                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1503                            let prop_id = self.catalog.get_or_create_property_key(prop);
1504                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1505                            let _ = self.catalog.add_required_property(label_id, prop_id);
1506                        }
1507                        let _ = self.catalog.add_constraint_to_type(
1508                            &stmt.label,
1509                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1510                        );
1511                    }
1512                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1513                        for prop in &stmt.properties {
1514                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1515                            let prop_id = self.catalog.get_or_create_property_key(prop);
1516                            let _ = self.catalog.add_required_property(label_id, prop_id);
1517                            let _ = self.catalog.add_constraint_to_type(
1518                                &stmt.label,
1519                                TypeConstraint::NotNull(prop.clone()),
1520                            );
1521                        }
1522                    }
1523                }
1524
1525                wal_log!(
1526                    self,
1527                    WalRecord::CreateConstraint {
1528                        name: constraint_name.clone(),
1529                        label: stmt.label.clone(),
1530                        properties: stmt.properties.clone(),
1531                        kind: kind_str.to_string(),
1532                    }
1533                );
1534                Ok(QueryResult::status(format!(
1535                    "Created {kind_str} constraint '{constraint_name}'"
1536                )))
1537            }
1538            SchemaStatement::DropConstraint { name, if_exists } => {
1539                let _ = if_exists;
1540                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1541                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1542            }
1543            SchemaStatement::CreateGraphType(stmt) => {
1544                use crate::catalog::GraphTypeDefinition;
1545                use grafeo_adapters::query::gql::ast::InlineElementType;
1546
1547                let effective_name = self.effective_type_key(&stmt.name);
1548
1549                // GG04: LIKE clause copies type from existing graph
1550                let (mut node_types, mut edge_types, open) =
1551                    if let Some(ref like_graph) = stmt.like_graph {
1552                        // Infer types from the graph's bound type, or use its existing types
1553                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1554                            if let Some(existing) = self
1555                                .catalog
1556                                .schema()
1557                                .and_then(|s| s.get_graph_type(&type_name))
1558                            {
1559                                (
1560                                    existing.allowed_node_types.clone(),
1561                                    existing.allowed_edge_types.clone(),
1562                                    existing.open,
1563                                )
1564                            } else {
1565                                (Vec::new(), Vec::new(), true)
1566                            }
1567                        } else {
1568                            // GG22: Infer from graph data (labels used in graph)
1569                            let nt = self.catalog.all_node_type_names();
1570                            let et = self.catalog.all_edge_type_names();
1571                            if nt.is_empty() && et.is_empty() {
1572                                (Vec::new(), Vec::new(), true)
1573                            } else {
1574                                (nt, et, false)
1575                            }
1576                        }
1577                    } else {
1578                        // Prefix element type names with schema for consistency
1579                        let nt = stmt
1580                            .node_types
1581                            .iter()
1582                            .map(|n| self.effective_type_key(n))
1583                            .collect();
1584                        let et = stmt
1585                            .edge_types
1586                            .iter()
1587                            .map(|n| self.effective_type_key(n))
1588                            .collect();
1589                        (nt, et, stmt.open)
1590                    };
1591
1592                // GG03: Process inline element type entries. Per ISO/IEC 39075,
1593                // a bare `NODE TYPE Name` or `EDGE TYPE Name` inside a graph
1594                // type body is a reference; anything with a property block or
1595                // a KEY clause is an inline declaration. See issue #316.
1596                for inline in &stmt.inline_types {
1597                    match inline {
1598                        InlineElementType::Node {
1599                            name,
1600                            properties,
1601                            key_labels,
1602                            is_reference,
1603                            ..
1604                        } => {
1605                            let inline_effective = self.effective_type_key(name);
1606                            if *is_reference {
1607                                // Reference: validate existence; do not register, do not WAL.
1608                                if self.catalog.get_node_type(&inline_effective).is_none() {
1609                                    return Err(Error::Query(QueryError::new(
1610                                        QueryErrorKind::Semantic,
1611                                        format!(
1612                                            "Referenced node type '{inline_effective}' does not exist"
1613                                        ),
1614                                    )));
1615                                }
1616                            } else {
1617                                let def = NodeTypeDefinition {
1618                                    name: inline_effective.clone(),
1619                                    properties: properties
1620                                        .iter()
1621                                        .map(|p| TypedProperty {
1622                                            name: p.name.clone(),
1623                                            data_type: PropertyDataType::from_type_name(
1624                                                &p.data_type,
1625                                            ),
1626                                            nullable: p.nullable,
1627                                            default_value: None,
1628                                        })
1629                                        .collect(),
1630                                    constraints: Vec::new(),
1631                                    parent_types: key_labels.clone(),
1632                                };
1633                                self.catalog.register_or_replace_node_type(def);
1634                                #[cfg(feature = "wal")]
1635                                {
1636                                    let props_for_wal: Vec<(String, String, bool)> = properties
1637                                        .iter()
1638                                        .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1639                                        .collect();
1640                                    self.log_schema_wal(&WalRecord::CreateNodeType {
1641                                        name: inline_effective.clone(),
1642                                        properties: props_for_wal,
1643                                        constraints: Vec::new(),
1644                                    });
1645                                }
1646                            }
1647                            if !node_types.contains(&inline_effective) {
1648                                node_types.push(inline_effective);
1649                            }
1650                        }
1651                        InlineElementType::Edge {
1652                            name,
1653                            properties,
1654                            source_node_types,
1655                            target_node_types,
1656                            is_reference,
1657                            ..
1658                        } => {
1659                            let inline_effective = self.effective_type_key(name);
1660                            if *is_reference {
1661                                if self.catalog.get_edge_type_def(&inline_effective).is_none() {
1662                                    return Err(Error::Query(QueryError::new(
1663                                        QueryErrorKind::Semantic,
1664                                        format!(
1665                                            "Referenced edge type '{inline_effective}' does not exist"
1666                                        ),
1667                                    )));
1668                                }
1669                            } else {
1670                                let def = EdgeTypeDefinition {
1671                                    name: inline_effective.clone(),
1672                                    properties: properties
1673                                        .iter()
1674                                        .map(|p| TypedProperty {
1675                                            name: p.name.clone(),
1676                                            data_type: PropertyDataType::from_type_name(
1677                                                &p.data_type,
1678                                            ),
1679                                            nullable: p.nullable,
1680                                            default_value: None,
1681                                        })
1682                                        .collect(),
1683                                    constraints: Vec::new(),
1684                                    source_node_types: source_node_types.clone(),
1685                                    target_node_types: target_node_types.clone(),
1686                                };
1687                                self.catalog.register_or_replace_edge_type_def(def);
1688                                #[cfg(feature = "wal")]
1689                                {
1690                                    let props_for_wal: Vec<(String, String, bool)> = properties
1691                                        .iter()
1692                                        .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1693                                        .collect();
1694                                    self.log_schema_wal(&WalRecord::CreateEdgeType {
1695                                        name: inline_effective.clone(),
1696                                        properties: props_for_wal,
1697                                        constraints: Vec::new(),
1698                                    });
1699                                }
1700                            }
1701                            if !edge_types.contains(&inline_effective) {
1702                                edge_types.push(inline_effective);
1703                            }
1704                        }
1705                    }
1706                }
1707
1708                let def = GraphTypeDefinition {
1709                    name: effective_name.clone(),
1710                    allowed_node_types: node_types.clone(),
1711                    allowed_edge_types: edge_types.clone(),
1712                    open,
1713                };
1714                let result = if stmt.or_replace {
1715                    // Drop existing first, ignore error if not found
1716                    let _ = self.catalog.drop_graph_type(&effective_name);
1717                    self.catalog.register_graph_type(def)
1718                } else {
1719                    self.catalog.register_graph_type(def)
1720                };
1721                match result {
1722                    Ok(()) => {
1723                        wal_log!(
1724                            self,
1725                            WalRecord::CreateGraphType {
1726                                name: effective_name.clone(),
1727                                node_types,
1728                                edge_types,
1729                                open,
1730                            }
1731                        );
1732                        Ok(QueryResult::status(format!(
1733                            "Created graph type '{}'",
1734                            stmt.name
1735                        )))
1736                    }
1737                    Err(e) if stmt.if_not_exists => {
1738                        let _ = e;
1739                        Ok(QueryResult::status("No change"))
1740                    }
1741                    Err(e) => Err(Error::Query(QueryError::new(
1742                        QueryErrorKind::Semantic,
1743                        e.to_string(),
1744                    ))),
1745                }
1746            }
1747            SchemaStatement::DropGraphType { name, if_exists } => {
1748                let effective_name = self.effective_type_key(&name);
1749                match self.catalog.drop_graph_type(&effective_name) {
1750                    Ok(()) => {
1751                        wal_log!(
1752                            self,
1753                            WalRecord::DropGraphType {
1754                                name: effective_name
1755                            }
1756                        );
1757                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1758                    }
1759                    Err(e) if if_exists => {
1760                        let _ = e;
1761                        Ok(QueryResult::status("No change"))
1762                    }
1763                    Err(e) => Err(Error::Query(QueryError::new(
1764                        QueryErrorKind::Semantic,
1765                        e.to_string(),
1766                    ))),
1767                }
1768            }
1769            SchemaStatement::CreateSchema {
1770                name,
1771                if_not_exists,
1772            } => {
1773                if name.contains('/') {
1774                    return Err(Error::Query(QueryError::new(
1775                        QueryErrorKind::Semantic,
1776                        format!(
1777                            "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1778                        ),
1779                    )));
1780                }
1781                match self.catalog.register_schema_namespace(name.clone()) {
1782                    Ok(()) => {
1783                        wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1784                        // Auto-create the schema's default graph partition so that
1785                        // SESSION SET SCHEMA + queries work without an explicit graph.
1786                        let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1787                        if self.store.create_graph(&default_key).unwrap_or(false) {
1788                            wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1789                        }
1790                        Ok(QueryResult::status(format!("Created schema '{name}'")))
1791                    }
1792                    Err(e) if if_not_exists => {
1793                        let _ = e;
1794                        Ok(QueryResult::status("No change"))
1795                    }
1796                    Err(e) => Err(Error::Query(QueryError::new(
1797                        QueryErrorKind::Semantic,
1798                        e.to_string(),
1799                    ))),
1800                }
1801            }
1802            SchemaStatement::DropSchema { name, if_exists } => {
1803                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1804                // The auto-created __default__ graph is exempt from the check.
1805                let prefix = format!("{name}/");
1806                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1807                let has_graphs = self
1808                    .store
1809                    .graph_names()
1810                    .iter()
1811                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1812                let has_types = self
1813                    .catalog
1814                    .all_node_type_names()
1815                    .iter()
1816                    .any(|n| n.starts_with(&prefix))
1817                    || self
1818                        .catalog
1819                        .all_edge_type_names()
1820                        .iter()
1821                        .any(|n| n.starts_with(&prefix))
1822                    || self
1823                        .catalog
1824                        .all_graph_type_names()
1825                        .iter()
1826                        .any(|n| n.starts_with(&prefix));
1827                if has_graphs || has_types {
1828                    return Err(Error::Query(QueryError::new(
1829                        QueryErrorKind::Semantic,
1830                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1831                    )));
1832                }
1833                match self.catalog.drop_schema_namespace(&name) {
1834                    Ok(()) => {
1835                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1836                        // Drop the auto-created default graph partition
1837                        if self.store.drop_graph(&default_graph_key) {
1838                            wal_log!(
1839                                self,
1840                                WalRecord::DropNamedGraph {
1841                                    name: default_graph_key,
1842                                }
1843                            );
1844                        }
1845                        // If this session was using the dropped schema, reset it
1846                        let mut current = self.current_schema.lock();
1847                        if current
1848                            .as_deref()
1849                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1850                        {
1851                            *current = None;
1852                        }
1853                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1854                    }
1855                    Err(e) if if_exists => {
1856                        let _ = e;
1857                        Ok(QueryResult::status("No change"))
1858                    }
1859                    Err(e) => Err(Error::Query(QueryError::new(
1860                        QueryErrorKind::Semantic,
1861                        e.to_string(),
1862                    ))),
1863                }
1864            }
1865            SchemaStatement::AlterNodeType(stmt) => {
1866                use grafeo_adapters::query::gql::ast::TypeAlteration;
1867                let effective_name = self.effective_type_key(&stmt.name);
1868                let mut wal_alts = Vec::new();
1869                for alt in &stmt.alterations {
1870                    match alt {
1871                        TypeAlteration::AddProperty(prop) => {
1872                            let typed = TypedProperty {
1873                                name: prop.name.clone(),
1874                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1875                                nullable: prop.nullable,
1876                                default_value: prop
1877                                    .default_value
1878                                    .as_ref()
1879                                    .map(|s| parse_default_literal(s)),
1880                            };
1881                            self.catalog
1882                                .alter_node_type_add_property(&effective_name, typed)
1883                                .map_err(|e| {
1884                                    Error::Query(QueryError::new(
1885                                        QueryErrorKind::Semantic,
1886                                        e.to_string(),
1887                                    ))
1888                                })?;
1889                            wal_alts.push((
1890                                "add".to_string(),
1891                                prop.name.clone(),
1892                                prop.data_type.clone(),
1893                                prop.nullable,
1894                            ));
1895                        }
1896                        TypeAlteration::DropProperty(name) => {
1897                            self.catalog
1898                                .alter_node_type_drop_property(&effective_name, name)
1899                                .map_err(|e| {
1900                                    Error::Query(QueryError::new(
1901                                        QueryErrorKind::Semantic,
1902                                        e.to_string(),
1903                                    ))
1904                                })?;
1905                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1906                        }
1907                    }
1908                }
1909                wal_log!(
1910                    self,
1911                    WalRecord::AlterNodeType {
1912                        name: effective_name,
1913                        alterations: wal_alts,
1914                    }
1915                );
1916                Ok(QueryResult::status(format!(
1917                    "Altered node type '{}'",
1918                    stmt.name
1919                )))
1920            }
1921            SchemaStatement::AlterEdgeType(stmt) => {
1922                use grafeo_adapters::query::gql::ast::TypeAlteration;
1923                let effective_name = self.effective_type_key(&stmt.name);
1924                let mut wal_alts = Vec::new();
1925                for alt in &stmt.alterations {
1926                    match alt {
1927                        TypeAlteration::AddProperty(prop) => {
1928                            let typed = TypedProperty {
1929                                name: prop.name.clone(),
1930                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1931                                nullable: prop.nullable,
1932                                default_value: prop
1933                                    .default_value
1934                                    .as_ref()
1935                                    .map(|s| parse_default_literal(s)),
1936                            };
1937                            self.catalog
1938                                .alter_edge_type_add_property(&effective_name, typed)
1939                                .map_err(|e| {
1940                                    Error::Query(QueryError::new(
1941                                        QueryErrorKind::Semantic,
1942                                        e.to_string(),
1943                                    ))
1944                                })?;
1945                            wal_alts.push((
1946                                "add".to_string(),
1947                                prop.name.clone(),
1948                                prop.data_type.clone(),
1949                                prop.nullable,
1950                            ));
1951                        }
1952                        TypeAlteration::DropProperty(name) => {
1953                            self.catalog
1954                                .alter_edge_type_drop_property(&effective_name, name)
1955                                .map_err(|e| {
1956                                    Error::Query(QueryError::new(
1957                                        QueryErrorKind::Semantic,
1958                                        e.to_string(),
1959                                    ))
1960                                })?;
1961                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1962                        }
1963                    }
1964                }
1965                wal_log!(
1966                    self,
1967                    WalRecord::AlterEdgeType {
1968                        name: effective_name,
1969                        alterations: wal_alts,
1970                    }
1971                );
1972                Ok(QueryResult::status(format!(
1973                    "Altered edge type '{}'",
1974                    stmt.name
1975                )))
1976            }
1977            SchemaStatement::AlterGraphType(stmt) => {
1978                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1979                let effective_name = self.effective_type_key(&stmt.name);
1980                let mut wal_alts = Vec::new();
1981                for alt in &stmt.alterations {
1982                    match alt {
1983                        GraphTypeAlteration::AddNodeType(name) => {
1984                            self.catalog
1985                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1986                                .map_err(|e| {
1987                                    Error::Query(QueryError::new(
1988                                        QueryErrorKind::Semantic,
1989                                        e.to_string(),
1990                                    ))
1991                                })?;
1992                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1993                        }
1994                        GraphTypeAlteration::DropNodeType(name) => {
1995                            self.catalog
1996                                .alter_graph_type_drop_node_type(&effective_name, name)
1997                                .map_err(|e| {
1998                                    Error::Query(QueryError::new(
1999                                        QueryErrorKind::Semantic,
2000                                        e.to_string(),
2001                                    ))
2002                                })?;
2003                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
2004                        }
2005                        GraphTypeAlteration::AddEdgeType(name) => {
2006                            self.catalog
2007                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
2008                                .map_err(|e| {
2009                                    Error::Query(QueryError::new(
2010                                        QueryErrorKind::Semantic,
2011                                        e.to_string(),
2012                                    ))
2013                                })?;
2014                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
2015                        }
2016                        GraphTypeAlteration::DropEdgeType(name) => {
2017                            self.catalog
2018                                .alter_graph_type_drop_edge_type(&effective_name, name)
2019                                .map_err(|e| {
2020                                    Error::Query(QueryError::new(
2021                                        QueryErrorKind::Semantic,
2022                                        e.to_string(),
2023                                    ))
2024                                })?;
2025                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
2026                        }
2027                    }
2028                }
2029                wal_log!(
2030                    self,
2031                    WalRecord::AlterGraphType {
2032                        name: effective_name,
2033                        alterations: wal_alts,
2034                    }
2035                );
2036                Ok(QueryResult::status(format!(
2037                    "Altered graph type '{}'",
2038                    stmt.name
2039                )))
2040            }
2041            SchemaStatement::CreateProcedure(stmt) => {
2042                use crate::catalog::ProcedureDefinition;
2043
2044                let def = ProcedureDefinition {
2045                    name: stmt.name.clone(),
2046                    params: stmt
2047                        .params
2048                        .iter()
2049                        .map(|p| (p.name.clone(), p.param_type.clone()))
2050                        .collect(),
2051                    returns: stmt
2052                        .returns
2053                        .iter()
2054                        .map(|r| (r.name.clone(), r.return_type.clone()))
2055                        .collect(),
2056                    body: stmt.body.clone(),
2057                };
2058
2059                if stmt.or_replace {
2060                    self.catalog.replace_procedure(def).map_err(|e| {
2061                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2062                    })?;
2063                } else {
2064                    match self.catalog.register_procedure(def) {
2065                        Ok(()) => {}
2066                        Err(_) if stmt.if_not_exists => {
2067                            return Ok(QueryResult::empty());
2068                        }
2069                        Err(e) => {
2070                            return Err(Error::Query(QueryError::new(
2071                                QueryErrorKind::Semantic,
2072                                e.to_string(),
2073                            )));
2074                        }
2075                    }
2076                }
2077
2078                wal_log!(
2079                    self,
2080                    WalRecord::CreateProcedure {
2081                        name: stmt.name.clone(),
2082                        params: stmt
2083                            .params
2084                            .iter()
2085                            .map(|p| (p.name.clone(), p.param_type.clone()))
2086                            .collect(),
2087                        returns: stmt
2088                            .returns
2089                            .iter()
2090                            .map(|r| (r.name.clone(), r.return_type.clone()))
2091                            .collect(),
2092                        body: stmt.body,
2093                    }
2094                );
2095                Ok(QueryResult::status(format!(
2096                    "Created procedure '{}'",
2097                    stmt.name
2098                )))
2099            }
2100            SchemaStatement::DropProcedure { name, if_exists } => {
2101                match self.catalog.drop_procedure(&name) {
2102                    Ok(()) => {}
2103                    Err(_) if if_exists => {
2104                        return Ok(QueryResult::empty());
2105                    }
2106                    Err(e) => {
2107                        return Err(Error::Query(QueryError::new(
2108                            QueryErrorKind::Semantic,
2109                            e.to_string(),
2110                        )));
2111                    }
2112                }
2113                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2114                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2115            }
2116            SchemaStatement::ShowIndexes => {
2117                return self.execute_show_indexes();
2118            }
2119            SchemaStatement::ShowConstraints => {
2120                return self.execute_show_constraints();
2121            }
2122            SchemaStatement::ShowNodeTypes => {
2123                return self.execute_show_node_types();
2124            }
2125            SchemaStatement::ShowEdgeTypes => {
2126                return self.execute_show_edge_types();
2127            }
2128            SchemaStatement::ShowGraphTypes => {
2129                return self.execute_show_graph_types();
2130            }
2131            SchemaStatement::ShowGraphType(name) => {
2132                return self.execute_show_graph_type(&name);
2133            }
2134            SchemaStatement::ShowCurrentGraphType => {
2135                return self.execute_show_current_graph_type();
2136            }
2137            SchemaStatement::ShowGraphs => {
2138                return self.execute_show_graphs();
2139            }
2140            SchemaStatement::ShowSchemas => {
2141                return self.execute_show_schemas();
2142            }
2143        };
2144
2145        // Invalidate all cached query plans after any successful DDL change.
2146        // DDL is rare, so clearing the entire cache is cheap and correct.
2147        if result.is_ok() {
2148            self.query_cache.clear();
2149        }
2150
2151        result
2152    }
2153
2154    /// Creates a vector index on the store by scanning existing nodes.
2155    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2156    fn create_vector_index_on_store(
2157        store: &LpgStore,
2158        label: &str,
2159        property: &str,
2160        dimensions: Option<usize>,
2161        metric: Option<&str>,
2162    ) -> Result<()> {
2163        use grafeo_common::types::{PropertyKey, Value};
2164        use grafeo_common::utils::error::Error;
2165        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2166
2167        let metric = match metric {
2168            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2169                Error::Internal(format!(
2170                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2171                ))
2172            })?,
2173            None => DistanceMetric::Cosine,
2174        };
2175
2176        let prop_key = PropertyKey::new(property);
2177        let mut found_dims: Option<usize> = dimensions;
2178        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2179
2180        for node in store.nodes_with_label(label) {
2181            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2182                if let Some(expected) = found_dims {
2183                    if v.len() != expected {
2184                        return Err(Error::Internal(format!(
2185                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2186                            v.len(),
2187                            node.id.0
2188                        )));
2189                    }
2190                } else {
2191                    found_dims = Some(v.len());
2192                }
2193                vectors.push((node.id, v.to_vec()));
2194            }
2195        }
2196
2197        let Some(dims) = found_dims else {
2198            return Err(Error::Internal(format!(
2199                "No vector properties found on :{label}({property}) and no dimensions specified"
2200            )));
2201        };
2202
2203        let config = HnswConfig::new(dims, metric);
2204        let index = HnswIndex::with_capacity(config, vectors.len());
2205        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2206        for (node_id, vec) in &vectors {
2207            index.insert(*node_id, vec, &accessor);
2208        }
2209
2210        store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2211        Ok(())
2212    }
2213
2214    /// Stub for when vector-index feature is not enabled.
2215    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2216    fn create_vector_index_on_store(
2217        _store: &LpgStore,
2218        _label: &str,
2219        _property: &str,
2220        _dimensions: Option<usize>,
2221        _metric: Option<&str>,
2222    ) -> Result<()> {
2223        Err(grafeo_common::utils::error::Error::Internal(
2224            "Vector index support requires the 'vector-index' feature".to_string(),
2225        ))
2226    }
2227
2228    /// Creates a text index on the store by scanning existing nodes.
2229    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2230    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2231        use grafeo_common::types::{PropertyKey, Value};
2232        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2233
2234        let mut index = InvertedIndex::new(BM25Config::default());
2235        let prop_key = PropertyKey::new(property);
2236
2237        let nodes = store.nodes_by_label(label);
2238        for node_id in nodes {
2239            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2240                index.insert(node_id, text.as_str());
2241            }
2242        }
2243
2244        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2245        Ok(())
2246    }
2247
2248    /// Stub for when text-index feature is not enabled.
2249    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2250    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2251        Err(grafeo_common::utils::error::Error::Internal(
2252            "Text index support requires the 'text-index' feature".to_string(),
2253        ))
2254    }
2255
2256    /// Returns a table of all indexes from the catalog.
2257    fn execute_show_indexes(&self) -> Result<QueryResult> {
2258        let indexes = self.catalog.all_indexes();
2259        let columns = vec![
2260            "name".to_string(),
2261            "type".to_string(),
2262            "label".to_string(),
2263            "property".to_string(),
2264        ];
2265        let rows: Vec<Vec<Value>> = indexes
2266            .into_iter()
2267            .map(|def| {
2268                let label_name = self
2269                    .catalog
2270                    .get_label_name(def.label)
2271                    .unwrap_or_else(|| "?".into());
2272                let prop_name = self
2273                    .catalog
2274                    .get_property_key_name(def.property_key)
2275                    .unwrap_or_else(|| "?".into());
2276                vec![
2277                    Value::from(def.name),
2278                    Value::from(format!("{:?}", def.index_type)),
2279                    Value::from(&*label_name),
2280                    Value::from(&*prop_name),
2281                ]
2282            })
2283            .collect();
2284        Ok(QueryResult {
2285            columns,
2286            column_types: Vec::new(),
2287            rows,
2288            ..QueryResult::empty()
2289        })
2290    }
2291
2292    /// Returns a table of all constraints (currently metadata-only).
2293    fn execute_show_constraints(&self) -> Result<QueryResult> {
2294        // Constraints are tracked in WAL but not yet in a queryable catalog.
2295        // Return an empty table with the expected schema.
2296        Ok(QueryResult {
2297            columns: vec![
2298                "name".to_string(),
2299                "type".to_string(),
2300                "label".to_string(),
2301                "properties".to_string(),
2302            ],
2303            column_types: Vec::new(),
2304            rows: Vec::new(),
2305            ..QueryResult::empty()
2306        })
2307    }
2308
2309    /// Returns a table of all registered node types in the current schema.
2310    fn execute_show_node_types(&self) -> Result<QueryResult> {
2311        let columns = vec![
2312            "name".to_string(),
2313            "properties".to_string(),
2314            "constraints".to_string(),
2315            "parents".to_string(),
2316        ];
2317        let schema = self.current_schema.lock().clone();
2318        let all_names = self.catalog.all_node_type_names();
2319        let type_names: Vec<String> = match &schema {
2320            Some(s) => {
2321                let prefix = format!("{s}/");
2322                all_names
2323                    .into_iter()
2324                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2325                    .collect()
2326            }
2327            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2328        };
2329        let rows: Vec<Vec<Value>> = type_names
2330            .into_iter()
2331            .filter_map(|name| {
2332                let lookup = match &schema {
2333                    Some(s) => format!("{s}/{name}"),
2334                    None => name.clone(),
2335                };
2336                let def = self.catalog.get_node_type(&lookup)?;
2337                let props: Vec<String> = def
2338                    .properties
2339                    .iter()
2340                    .map(|p| {
2341                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2342                        format!("{} {}{}", p.name, p.data_type, nullable)
2343                    })
2344                    .collect();
2345                let constraints: Vec<String> =
2346                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2347                let parents = def.parent_types.join(", ");
2348                Some(vec![
2349                    Value::from(name),
2350                    Value::from(props.join(", ")),
2351                    Value::from(constraints.join(", ")),
2352                    Value::from(parents),
2353                ])
2354            })
2355            .collect();
2356        Ok(QueryResult {
2357            columns,
2358            column_types: Vec::new(),
2359            rows,
2360            ..QueryResult::empty()
2361        })
2362    }
2363
2364    /// Returns a table of all registered edge types in the current schema.
2365    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2366        let columns = vec![
2367            "name".to_string(),
2368            "properties".to_string(),
2369            "source_types".to_string(),
2370            "target_types".to_string(),
2371        ];
2372        let schema = self.current_schema.lock().clone();
2373        let all_names = self.catalog.all_edge_type_names();
2374        let type_names: Vec<String> = match &schema {
2375            Some(s) => {
2376                let prefix = format!("{s}/");
2377                all_names
2378                    .into_iter()
2379                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2380                    .collect()
2381            }
2382            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2383        };
2384        let rows: Vec<Vec<Value>> = type_names
2385            .into_iter()
2386            .filter_map(|name| {
2387                let lookup = match &schema {
2388                    Some(s) => format!("{s}/{name}"),
2389                    None => name.clone(),
2390                };
2391                let def = self.catalog.get_edge_type_def(&lookup)?;
2392                let props: Vec<String> = def
2393                    .properties
2394                    .iter()
2395                    .map(|p| {
2396                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2397                        format!("{} {}{}", p.name, p.data_type, nullable)
2398                    })
2399                    .collect();
2400                let src = def.source_node_types.join(", ");
2401                let tgt = def.target_node_types.join(", ");
2402                Some(vec![
2403                    Value::from(name),
2404                    Value::from(props.join(", ")),
2405                    Value::from(src),
2406                    Value::from(tgt),
2407                ])
2408            })
2409            .collect();
2410        Ok(QueryResult {
2411            columns,
2412            column_types: Vec::new(),
2413            rows,
2414            ..QueryResult::empty()
2415        })
2416    }
2417
2418    /// Returns a table of all registered graph types in the current schema.
2419    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2420        let columns = vec![
2421            "name".to_string(),
2422            "open".to_string(),
2423            "node_types".to_string(),
2424            "edge_types".to_string(),
2425        ];
2426        let schema = self.current_schema.lock().clone();
2427        let all_names = self.catalog.all_graph_type_names();
2428        let type_names: Vec<String> = match &schema {
2429            Some(s) => {
2430                let prefix = format!("{s}/");
2431                all_names
2432                    .into_iter()
2433                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2434                    .collect()
2435            }
2436            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2437        };
2438        let rows: Vec<Vec<Value>> = type_names
2439            .into_iter()
2440            .filter_map(|name| {
2441                let lookup = match &schema {
2442                    Some(s) => format!("{s}/{name}"),
2443                    None => name.clone(),
2444                };
2445                let def = self.catalog.get_graph_type_def(&lookup)?;
2446                // Strip schema prefix from allowed type names for display
2447                let strip = |n: &String| -> String {
2448                    match &schema {
2449                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2450                        None => n.clone(),
2451                    }
2452                };
2453                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2454                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2455                Some(vec![
2456                    Value::from(name),
2457                    Value::from(def.open),
2458                    Value::from(node_types.join(", ")),
2459                    Value::from(edge_types.join(", ")),
2460                ])
2461            })
2462            .collect();
2463        Ok(QueryResult {
2464            columns,
2465            column_types: Vec::new(),
2466            rows,
2467            ..QueryResult::empty()
2468        })
2469    }
2470
2471    /// Returns the list of named graphs visible in the current schema context.
2472    ///
2473    /// When a session schema is set, only graphs belonging to that schema are
2474    /// shown (their compound prefix is stripped). When no schema is set, graphs
2475    /// without a schema prefix are shown (the default schema).
2476    #[cfg(feature = "lpg")]
2477    fn execute_show_graphs(&self) -> Result<QueryResult> {
2478        let schema = self.current_schema.lock().clone();
2479        let all_names = self.store.graph_names();
2480
2481        let mut names: Vec<String> = match &schema {
2482            Some(s) => {
2483                let prefix = format!("{s}/");
2484                all_names
2485                    .into_iter()
2486                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2487                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2488                    .collect()
2489            }
2490            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2491        };
2492        names.sort();
2493
2494        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2495        Ok(QueryResult {
2496            columns: vec!["name".to_string()],
2497            column_types: Vec::new(),
2498            rows,
2499            ..QueryResult::empty()
2500        })
2501    }
2502
2503    /// Returns the list of all schema namespaces.
2504    fn execute_show_schemas(&self) -> Result<QueryResult> {
2505        let mut names = self.catalog.schema_names();
2506        names.sort();
2507        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2508        Ok(QueryResult {
2509            columns: vec!["name".to_string()],
2510            column_types: Vec::new(),
2511            rows,
2512            ..QueryResult::empty()
2513        })
2514    }
2515
2516    /// Returns detailed info for a specific graph type.
2517    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2518        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2519
2520        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2521            Error::Query(QueryError::new(
2522                QueryErrorKind::Semantic,
2523                format!("Graph type '{name}' not found"),
2524            ))
2525        })?;
2526
2527        let columns = vec![
2528            "name".to_string(),
2529            "open".to_string(),
2530            "node_types".to_string(),
2531            "edge_types".to_string(),
2532        ];
2533        let rows = vec![vec![
2534            Value::from(def.name),
2535            Value::from(def.open),
2536            Value::from(def.allowed_node_types.join(", ")),
2537            Value::from(def.allowed_edge_types.join(", ")),
2538        ]];
2539        Ok(QueryResult {
2540            columns,
2541            column_types: Vec::new(),
2542            rows,
2543            ..QueryResult::empty()
2544        })
2545    }
2546
2547    /// Returns the graph type bound to the current graph.
2548    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2549        let graph_name = self
2550            .current_graph()
2551            .unwrap_or_else(|| "default".to_string());
2552        let columns = vec![
2553            "graph".to_string(),
2554            "graph_type".to_string(),
2555            "open".to_string(),
2556            "node_types".to_string(),
2557            "edge_types".to_string(),
2558        ];
2559
2560        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2561            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2562        {
2563            let rows = vec![vec![
2564                Value::from(graph_name),
2565                Value::from(type_name),
2566                Value::from(def.open),
2567                Value::from(def.allowed_node_types.join(", ")),
2568                Value::from(def.allowed_edge_types.join(", ")),
2569            ]];
2570            return Ok(QueryResult {
2571                columns,
2572                column_types: Vec::new(),
2573                rows,
2574                ..QueryResult::empty()
2575            });
2576        }
2577
2578        // No graph type binding found
2579        Ok(QueryResult {
2580            columns,
2581            column_types: Vec::new(),
2582            rows: vec![vec![
2583                Value::from(graph_name),
2584                Value::Null,
2585                Value::Null,
2586                Value::Null,
2587                Value::Null,
2588            ]],
2589            ..QueryResult::empty()
2590        })
2591    }
2592
2593    /// Executes a GQL query.
2594    ///
2595    /// # Errors
2596    ///
2597    /// Returns an error if the query fails to parse or execute.
2598    ///
2599    /// # Examples
2600    ///
2601    /// ```no_run
2602    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2603    /// use grafeo_engine::GrafeoDB;
2604    ///
2605    /// let db = GrafeoDB::new_in_memory();
2606    /// let session = db.session();
2607    ///
2608    /// // Create a node
2609    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2610    ///
2611    /// // Query nodes
2612    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2613    /// for row in result.rows() {
2614    ///     println!("{:?}", row);
2615    /// }
2616    /// # Ok(())
2617    /// # }
2618    /// ```
2619    #[cfg(feature = "gql")]
2620    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2621        self.require_lpg("GQL")?;
2622
2623        #[cfg(feature = "testing-statement-injection")]
2624        grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2625            grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2626        })?;
2627
2628        use crate::query::{
2629            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2630            translators::gql,
2631        };
2632
2633        let _span = grafeo_info_span!(
2634            "grafeo::session::execute",
2635            language = "gql",
2636            query_len = query.len(),
2637        );
2638
2639        #[cfg(not(target_arch = "wasm32"))]
2640        let start_time = std::time::Instant::now();
2641
2642        // Parse and translate, checking for session/schema commands first
2643        let translation = gql::translate_full(query)?;
2644        let logical_plan = match translation {
2645            gql::GqlTranslationResult::SessionCommand(cmd) => {
2646                return self.execute_session_command(cmd);
2647            }
2648            #[cfg(feature = "lpg")]
2649            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2650                // All DDL requires Admin role
2651                self.require_permission(crate::auth::StatementKind::Admin)?;
2652                if *self.read_only_tx.lock() {
2653                    return Err(grafeo_common::utils::error::Error::Transaction(
2654                        grafeo_common::utils::error::TransactionError::ReadOnly,
2655                    ));
2656                }
2657                return self.execute_schema_command(cmd);
2658            }
2659            gql::GqlTranslationResult::Plan(plan) => {
2660                // Only walk the operator tree when it matters: non-admin
2661                // identities need permission checks, read-only transactions
2662                // need mutation blocking. Admin sessions in auto-commit mode
2663                // skip the tree walk entirely.
2664                let read_only = *self.read_only_tx.lock();
2665                let need_check = read_only || !self.identity.can_admin();
2666                let is_mutation = need_check && plan.root.has_mutations();
2667                if is_mutation {
2668                    self.require_permission(crate::auth::StatementKind::Write)?;
2669                }
2670                if read_only && is_mutation {
2671                    return Err(grafeo_common::utils::error::Error::Transaction(
2672                        grafeo_common::utils::error::TransactionError::ReadOnly,
2673                    ));
2674                }
2675                plan
2676            }
2677            #[cfg(not(feature = "lpg"))]
2678            gql::GqlTranslationResult::SchemaCommand(_) => {
2679                return Err(grafeo_common::utils::error::Error::Internal(
2680                    "Schema commands require the `lpg` feature".to_string(),
2681                ));
2682            }
2683        };
2684
2685        // Create cache key for this query
2686        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2687
2688        // Try to get cached optimized plan, or use the plan we just translated
2689        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2690            cached_plan
2691        } else {
2692            // Semantic validation
2693            let mut binder = Binder::new();
2694            let _binding_context = binder.bind(&logical_plan)?;
2695
2696            // Optimize the plan
2697            let active = self.active_store();
2698            let optimizer = Optimizer::from_graph_store(&*active);
2699            let plan = optimizer.optimize(logical_plan)?;
2700
2701            // Cache the optimized plan for future use
2702            self.query_cache.put_optimized(cache_key, plan.clone());
2703
2704            plan
2705        };
2706
2707        // Resolve the active store for query execution
2708        let active = self.active_store();
2709
2710        // EXPLAIN: annotate pushdown hints and return the plan tree
2711        if optimized_plan.explain {
2712            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2713            let mut plan = optimized_plan;
2714            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2715            return Ok(explain_result(&plan));
2716        }
2717
2718        // PROFILE: execute with per-operator instrumentation
2719        if optimized_plan.profile {
2720            let has_mutations = optimized_plan.root.has_mutations();
2721            return self.with_auto_commit(has_mutations, || {
2722                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2723                let planner = self.create_planner_for_store(
2724                    Arc::clone(&active),
2725                    viewing_epoch,
2726                    transaction_id,
2727                );
2728                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2729
2730                let executor = self.make_executor(physical_plan.columns.clone());
2731                let _result = executor.execute(physical_plan.operator.as_mut())?;
2732
2733                let total_time_ms;
2734                #[cfg(not(target_arch = "wasm32"))]
2735                {
2736                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2737                }
2738                #[cfg(target_arch = "wasm32")]
2739                {
2740                    total_time_ms = 0.0;
2741                }
2742
2743                let profile_tree = crate::query::profile::build_profile_tree(
2744                    &optimized_plan.root,
2745                    &mut entries.into_iter(),
2746                );
2747                Ok(crate::query::profile::profile_result(
2748                    &profile_tree,
2749                    total_time_ms,
2750                ))
2751            });
2752        }
2753
2754        let has_mutations = optimized_plan.root.has_mutations();
2755
2756        let result = self.with_auto_commit(has_mutations, || {
2757            // Get transaction context for MVCC visibility
2758            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2759
2760            // Convert to physical plan with transaction context
2761            // (Physical planning cannot be cached as it depends on transaction state)
2762            // Safe to use read-only fast path when: this query has no mutations AND
2763            // there is no active transaction that may have prior uncommitted writes.
2764            let has_active_tx = self.current_transaction.lock().is_some();
2765            let read_only = !has_mutations && !has_active_tx;
2766            let planner = self.create_planner_for_store_with_read_only(
2767                Arc::clone(&active),
2768                viewing_epoch,
2769                transaction_id,
2770                read_only,
2771            );
2772            let physical_plan = planner.plan(&optimized_plan)?;
2773
2774            // Execute the plan via push-based pipeline when possible
2775            let executor = self.make_executor(physical_plan.columns.clone());
2776            let (mut source, push_ops) = {
2777                #[cfg(feature = "spill")]
2778                {
2779                    let memory_ctx = self.make_operator_memory_context();
2780                    grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2781                        physical_plan.into_operator(),
2782                        memory_ctx,
2783                    )
2784                }
2785                #[cfg(not(feature = "spill"))]
2786                {
2787                    grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2788                        physical_plan.into_operator(),
2789                    )
2790                }
2791            };
2792            let mut result = if push_ops.is_empty() {
2793                // Pure source query: use traditional pull-based execution
2794                executor.execute(source.as_mut())?
2795            } else {
2796                // Pipeline execution: push data from source through operators
2797                executor.execute_pipeline(source, push_ops)?
2798            };
2799
2800            // Add execution metrics
2801            let rows_scanned = result.rows.len() as u64;
2802            #[cfg(not(target_arch = "wasm32"))]
2803            {
2804                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2805                result.execution_time_ms = Some(elapsed_ms);
2806            }
2807            result.rows_scanned = Some(rows_scanned);
2808
2809            Ok(result)
2810        });
2811
2812        // Record metrics for this query execution.
2813        #[cfg(feature = "metrics")]
2814        {
2815            #[cfg(not(target_arch = "wasm32"))]
2816            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2817            #[cfg(target_arch = "wasm32")]
2818            let elapsed_ms = None;
2819            self.record_query_metrics("gql", elapsed_ms, &result);
2820        }
2821
2822        result
2823    }
2824
2825    /// Executes a GQL query and returns a lazy result stream.
2826    ///
2827    /// The stream pulls chunks from the operator pipeline on demand. Use it
2828    /// when the result set is too large to fit in memory or when you want
2829    /// first-row latency (process rows as they arrive rather than waiting
2830    /// for the whole query to complete).
2831    ///
2832    /// This is the read-only, pull-only path: `execute_streaming` rejects
2833    /// mutations (INSERT/DELETE/SET), schema/session commands, EXPLAIN,
2834    /// PROFILE, and queries whose planner emits a push-based pipeline. Run
2835    /// those via [`execute`](Self::execute) instead.
2836    ///
2837    /// # Stability: Experimental
2838    ///
2839    /// New in 0.5.40. Signature may change before being promoted to Beta.
2840    ///
2841    /// # Errors
2842    ///
2843    /// Returns an error if parsing or planning fails, if the query is a
2844    /// kind that cannot be streamed, or if permission checks fail.
2845    #[cfg(all(feature = "gql", feature = "lpg"))]
2846    pub fn execute_streaming(
2847        &self,
2848        query: &str,
2849    ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2850        use crate::query::executor::stream::{ResultStream, StreamGuard};
2851
2852        let (source, columns, deadline) = self.build_streaming_plan(query)?;
2853        let guard = StreamGuard::new(&self.active_streams);
2854        Ok(ResultStream::new(source, columns, deadline, guard))
2855    }
2856
2857    /// Builds a pull-based physical pipeline for streaming, returning the
2858    /// root operator and metadata needed to wrap it in a `ResultStream` or
2859    /// `OwnedResultStream`.
2860    ///
2861    /// Rejects session/schema commands, mutations, EXPLAIN/PROFILE, and plans
2862    /// that require a push-based pipeline. Used by both
2863    /// [`Session::execute_streaming`] and [`GrafeoDB::execute_streaming`].
2864    #[cfg(all(feature = "gql", feature = "lpg"))]
2865    pub(crate) fn build_streaming_plan(
2866        &self,
2867        query: &str,
2868    ) -> Result<(
2869        Box<dyn grafeo_core::execution::operators::Operator>,
2870        Vec<String>,
2871        Option<Instant>,
2872    )> {
2873        use crate::query::{
2874            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2875            translators::gql,
2876        };
2877
2878        self.require_lpg("GQL")?;
2879
2880        let _span = grafeo_info_span!(
2881            "grafeo::session::execute_streaming",
2882            language = "gql",
2883            query_len = query.len(),
2884        );
2885
2886        // Parse and translate, rejecting anything that isn't a streamable query.
2887        let translation = gql::translate_full(query)?;
2888        let logical_plan = match translation {
2889            gql::GqlTranslationResult::SessionCommand(_) => {
2890                return Err(grafeo_common::utils::error::Error::Query(
2891                    grafeo_common::utils::error::QueryError::new(
2892                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2893                        "session commands cannot be streamed; use execute() instead",
2894                    ),
2895                ));
2896            }
2897            gql::GqlTranslationResult::SchemaCommand(_) => {
2898                return Err(grafeo_common::utils::error::Error::Query(
2899                    grafeo_common::utils::error::QueryError::new(
2900                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2901                        "schema DDL cannot be streamed; use execute() instead",
2902                    ),
2903                ));
2904            }
2905            gql::GqlTranslationResult::Plan(plan) => {
2906                if plan.root.has_mutations() {
2907                    return Err(grafeo_common::utils::error::Error::Query(
2908                        grafeo_common::utils::error::QueryError::new(
2909                            grafeo_common::utils::error::QueryErrorKind::Semantic,
2910                            "mutating queries cannot be streamed; use execute() instead",
2911                        ),
2912                    ));
2913                }
2914                if !self.identity.can_admin() {
2915                    self.require_permission(crate::auth::StatementKind::Read)?;
2916                }
2917                plan
2918            }
2919        };
2920
2921        // Cache + bind + optimize (same path as execute).
2922        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2923        let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2924            cached
2925        } else {
2926            let mut binder = Binder::new();
2927            let _binding_context = binder.bind(&logical_plan)?;
2928            let active = self.active_store();
2929            let optimizer = Optimizer::from_graph_store(&*active);
2930            let plan = optimizer.optimize(logical_plan)?;
2931            self.query_cache.put_optimized(cache_key, plan.clone());
2932            plan
2933        };
2934
2935        if optimized_plan.explain || optimized_plan.profile {
2936            return Err(grafeo_common::utils::error::Error::Query(
2937                grafeo_common::utils::error::QueryError::new(
2938                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2939                    "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2940                ),
2941            ));
2942        }
2943
2944        // Plan to physical operators.
2945        let active = self.active_store();
2946        let has_active_tx = self.current_transaction.lock().is_some();
2947        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2948        let planner = self.create_planner_for_store_with_read_only(
2949            Arc::clone(&active),
2950            viewing_epoch,
2951            transaction_id,
2952            !has_active_tx,
2953        );
2954        let physical_plan = planner.plan(&optimized_plan)?;
2955        let columns = physical_plan.columns.clone();
2956
2957        // Streaming only supports the pure pull path. Reject plans that would
2958        // require push operators (Sort, Aggregate, Distinct, etc.) so we never
2959        // silently materialize under the hood.
2960        let (source, push_ops) = {
2961            #[cfg(feature = "spill")]
2962            {
2963                let memory_ctx = self.make_operator_memory_context();
2964                grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2965                    physical_plan.into_operator(),
2966                    memory_ctx,
2967                )
2968            }
2969            #[cfg(not(feature = "spill"))]
2970            {
2971                grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2972                    physical_plan.into_operator(),
2973                )
2974            }
2975        };
2976        if !push_ops.is_empty() {
2977            return Err(grafeo_common::utils::error::Error::Query(
2978                grafeo_common::utils::error::QueryError::new(
2979                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2980                    "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2981                     which cannot be streamed; use execute() instead",
2982                ),
2983            ));
2984        }
2985
2986        Ok((source, columns, self.query_deadline()))
2987    }
2988
2989    /// Executes a GQL query with visibility at the specified epoch.
2990    ///
2991    /// This enables time-travel queries: the query sees the database
2992    /// as it existed at the given epoch.
2993    ///
2994    /// # Errors
2995    ///
2996    /// Returns an error if parsing or execution fails.
2997    #[cfg(feature = "gql")]
2998    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2999        let previous = self.viewing_epoch_override.lock().replace(epoch);
3000        let result = self.execute(query);
3001        *self.viewing_epoch_override.lock() = previous;
3002        result
3003    }
3004
3005    /// Executes a GQL query at a specific epoch with optional parameters.
3006    ///
3007    /// Combines epoch-based time travel with parameterized queries.
3008    ///
3009    /// # Errors
3010    ///
3011    /// Returns an error if parsing or execution fails.
3012    #[cfg(feature = "gql")]
3013    pub fn execute_at_epoch_with_params(
3014        &self,
3015        query: &str,
3016        epoch: EpochId,
3017        params: Option<std::collections::HashMap<String, Value>>,
3018    ) -> Result<QueryResult> {
3019        let previous = self.viewing_epoch_override.lock().replace(epoch);
3020        let result = if let Some(p) = params {
3021            self.execute_with_params(query, p)
3022        } else {
3023            self.execute(query)
3024        };
3025        *self.viewing_epoch_override.lock() = previous;
3026        result
3027    }
3028
3029    /// Executes a GQL query with parameters.
3030    ///
3031    /// # Errors
3032    ///
3033    /// Returns an error if the query fails to parse or execute.
3034    #[cfg(feature = "gql")]
3035    pub fn execute_with_params(
3036        &self,
3037        query: &str,
3038        params: std::collections::HashMap<String, Value>,
3039    ) -> Result<QueryResult> {
3040        self.require_lpg("GQL")?;
3041
3042        use crate::query::processor::{QueryLanguage, QueryProcessor};
3043
3044        // Reject writes if the identity lacks permission. Parse the query
3045        // to determine mutation status reliably (the text heuristic has false
3046        // negatives that could bypass authorization).
3047        let has_mutations = if self.identity.can_write() {
3048            // Fast path: identity can write, use heuristic for auto-commit only
3049            Self::query_looks_like_mutation(query)
3050        } else {
3051            // Restricted identity: parse to check mutations reliably
3052            use crate::query::translators::gql;
3053            match gql::translate(query) {
3054                Ok(plan) if plan.root.has_mutations() => {
3055                    self.require_permission(crate::auth::StatementKind::Write)?;
3056                    true
3057                }
3058                Ok(_) => false,
3059                // Parse error: let the processor handle it below
3060                Err(_) => Self::query_looks_like_mutation(query),
3061            }
3062        };
3063        let active = self.active_store();
3064
3065        self.with_auto_commit(has_mutations, || {
3066            // Get transaction context for MVCC visibility
3067            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3068
3069            // Create processor with transaction context
3070            let processor = QueryProcessor::for_stores_with_transaction(
3071                Arc::clone(&active),
3072                self.active_write_store(),
3073                Arc::clone(&self.transaction_manager),
3074            )?;
3075
3076            // Apply transaction context if in a transaction
3077            let processor = if let Some(transaction_id) = transaction_id {
3078                processor.with_transaction_context(viewing_epoch, transaction_id)
3079            } else {
3080                processor
3081            };
3082
3083            processor.process(query, QueryLanguage::Gql, Some(&params))
3084        })
3085    }
3086
3087    /// Executes a GQL query with parameters.
3088    ///
3089    /// # Errors
3090    ///
3091    /// Returns an error if no query language is enabled.
3092    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3093    pub fn execute_with_params(
3094        &self,
3095        _query: &str,
3096        _params: std::collections::HashMap<String, Value>,
3097    ) -> Result<QueryResult> {
3098        Err(grafeo_common::utils::error::Error::Internal(
3099            "No query language enabled".to_string(),
3100        ))
3101    }
3102
3103    /// Executes a GQL query.
3104    ///
3105    /// # Errors
3106    ///
3107    /// Returns an error if no query language is enabled.
3108    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3109    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3110        Err(grafeo_common::utils::error::Error::Internal(
3111            "No query language enabled".to_string(),
3112        ))
3113    }
3114
3115    /// Executes a Cypher query.
3116    ///
3117    /// # Errors
3118    ///
3119    /// Returns an error if the query fails to parse or execute.
3120    #[cfg(feature = "cypher")]
3121    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3122        use crate::query::{
3123            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3124            translators::cypher,
3125        };
3126
3127        // Handle schema DDL and SHOW commands before the normal query path
3128        let translation = cypher::translate_full(query)?;
3129        match translation {
3130            #[cfg(feature = "lpg")]
3131            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3132                use grafeo_common::utils::error::{
3133                    Error as GrafeoError, QueryError, QueryErrorKind,
3134                };
3135                self.require_permission(crate::auth::StatementKind::Admin)?;
3136                if *self.read_only_tx.lock() {
3137                    return Err(GrafeoError::Query(QueryError::new(
3138                        QueryErrorKind::Semantic,
3139                        "Cannot execute schema DDL in a read-only transaction",
3140                    )));
3141                }
3142                return self.execute_schema_command(cmd);
3143            }
3144            #[cfg(not(feature = "lpg"))]
3145            cypher::CypherTranslationResult::SchemaCommand(_) => {
3146                return Err(grafeo_common::utils::error::Error::Internal(
3147                    "Schema DDL requires the `lpg` feature".to_string(),
3148                ));
3149            }
3150            cypher::CypherTranslationResult::ShowIndexes => {
3151                return self.execute_show_indexes();
3152            }
3153            cypher::CypherTranslationResult::ShowConstraints => {
3154                return self.execute_show_constraints();
3155            }
3156            cypher::CypherTranslationResult::ShowCurrentGraphType => {
3157                return self.execute_show_current_graph_type();
3158            }
3159            cypher::CypherTranslationResult::Plan(_) => {
3160                // Fall through to normal execution below
3161            }
3162        }
3163
3164        #[cfg(not(target_arch = "wasm32"))]
3165        let start_time = std::time::Instant::now();
3166
3167        // Create cache key for this query
3168        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3169
3170        // Try to get cached optimized plan
3171        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3172            cached_plan
3173        } else {
3174            // Parse and translate the query to a logical plan
3175            let logical_plan = cypher::translate(query)?;
3176
3177            // Semantic validation
3178            let mut binder = Binder::new();
3179            let _binding_context = binder.bind(&logical_plan)?;
3180
3181            // Optimize the plan
3182            let active = self.active_store();
3183            let optimizer = Optimizer::from_graph_store(&*active);
3184            let plan = optimizer.optimize(logical_plan)?;
3185
3186            // Cache the optimized plan
3187            self.query_cache.put_optimized(cache_key, plan.clone());
3188
3189            plan
3190        };
3191
3192        // Check role-based permission for mutations
3193        if optimized_plan.root.has_mutations() {
3194            self.require_permission(crate::auth::StatementKind::Write)?;
3195        }
3196
3197        // Resolve the active store for query execution
3198        let active = self.active_store();
3199
3200        // EXPLAIN
3201        if optimized_plan.explain {
3202            use crate::query::processor::{annotate_pushdown_hints, explain_result};
3203            let mut plan = optimized_plan;
3204            annotate_pushdown_hints(&mut plan.root, active.as_ref());
3205            return Ok(explain_result(&plan));
3206        }
3207
3208        // PROFILE
3209        if optimized_plan.profile {
3210            let has_mutations = optimized_plan.root.has_mutations();
3211            return self.with_auto_commit(has_mutations, || {
3212                let (viewing_epoch, transaction_id) = self.get_transaction_context();
3213                let planner = self.create_planner_for_store(
3214                    Arc::clone(&active),
3215                    viewing_epoch,
3216                    transaction_id,
3217                );
3218                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3219
3220                let executor = self.make_executor(physical_plan.columns.clone());
3221                let _result = executor.execute(physical_plan.operator.as_mut())?;
3222
3223                let total_time_ms;
3224                #[cfg(not(target_arch = "wasm32"))]
3225                {
3226                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3227                }
3228                #[cfg(target_arch = "wasm32")]
3229                {
3230                    total_time_ms = 0.0;
3231                }
3232
3233                let profile_tree = crate::query::profile::build_profile_tree(
3234                    &optimized_plan.root,
3235                    &mut entries.into_iter(),
3236                );
3237                Ok(crate::query::profile::profile_result(
3238                    &profile_tree,
3239                    total_time_ms,
3240                ))
3241            });
3242        }
3243
3244        let has_mutations = optimized_plan.root.has_mutations();
3245
3246        let result = self.with_auto_commit(has_mutations, || {
3247            // Get transaction context for MVCC visibility
3248            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3249
3250            // Convert to physical plan with transaction context
3251            let planner =
3252                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3253            let mut physical_plan = planner.plan(&optimized_plan)?;
3254
3255            // Execute the plan
3256            let executor = self.make_executor(physical_plan.columns.clone());
3257            executor.execute(physical_plan.operator.as_mut())
3258        });
3259
3260        #[cfg(feature = "metrics")]
3261        {
3262            #[cfg(not(target_arch = "wasm32"))]
3263            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3264            #[cfg(target_arch = "wasm32")]
3265            let elapsed_ms = None;
3266            self.record_query_metrics("cypher", elapsed_ms, &result);
3267        }
3268
3269        result
3270    }
3271
3272    /// Executes a Gremlin query.
3273    ///
3274    /// # Errors
3275    ///
3276    /// Returns an error if the query fails to parse or execute.
3277    ///
3278    /// # Examples
3279    ///
3280    /// ```no_run
3281    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3282    /// use grafeo_engine::GrafeoDB;
3283    ///
3284    /// let db = GrafeoDB::new_in_memory();
3285    /// let session = db.session();
3286    ///
3287    /// // Create some nodes first
3288    /// session.create_node(&["Person"]);
3289    ///
3290    /// // Query using Gremlin
3291    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
3292    /// # Ok(())
3293    /// # }
3294    /// ```
3295    #[cfg(feature = "gremlin")]
3296    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3297        use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3298
3299        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3300        let start_time = Instant::now();
3301
3302        // Parse and translate the query to a logical plan
3303        let logical_plan = gremlin::translate(query)?;
3304
3305        // Semantic validation
3306        let mut binder = Binder::new();
3307        let _binding_context = binder.bind(&logical_plan)?;
3308
3309        // Optimize the plan
3310        let active = self.active_store();
3311        let optimizer = Optimizer::from_graph_store(&*active);
3312        let optimized_plan = optimizer.optimize(logical_plan)?;
3313
3314        let has_mutations = optimized_plan.root.has_mutations();
3315        if has_mutations {
3316            self.require_permission(crate::auth::StatementKind::Write)?;
3317        }
3318
3319        let result = self.with_auto_commit(has_mutations, || {
3320            // Get transaction context for MVCC visibility
3321            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3322
3323            // Convert to physical plan with transaction context
3324            let planner =
3325                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3326            let mut physical_plan = planner.plan(&optimized_plan)?;
3327
3328            // Execute the plan
3329            let executor = self.make_executor(physical_plan.columns.clone());
3330            executor.execute(physical_plan.operator.as_mut())
3331        });
3332
3333        #[cfg(feature = "metrics")]
3334        {
3335            #[cfg(not(target_arch = "wasm32"))]
3336            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3337            #[cfg(target_arch = "wasm32")]
3338            let elapsed_ms = None;
3339            self.record_query_metrics("gremlin", elapsed_ms, &result);
3340        }
3341
3342        result
3343    }
3344
3345    /// Executes a Gremlin query with parameters.
3346    ///
3347    /// # Errors
3348    ///
3349    /// Returns an error if the query fails to parse or execute.
3350    #[cfg(feature = "gremlin")]
3351    pub fn execute_gremlin_with_params(
3352        &self,
3353        query: &str,
3354        params: std::collections::HashMap<String, Value>,
3355    ) -> Result<QueryResult> {
3356        use crate::query::{
3357            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3358            translators::gremlin,
3359        };
3360
3361        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3362        let start_time = Instant::now();
3363
3364        // Parse and translate the query to a logical plan
3365        let mut logical_plan = gremlin::translate(query)?;
3366
3367        // Substitute parameters
3368        substitute_params(&mut logical_plan, &params)?;
3369
3370        // Semantic validation
3371        let mut binder = Binder::new();
3372        let _binding_context = binder.bind(&logical_plan)?;
3373
3374        // Optimize the plan
3375        let active = self.active_store();
3376        let optimizer = Optimizer::from_graph_store(&*active);
3377        let optimized_plan = optimizer.optimize(logical_plan)?;
3378
3379        let has_mutations = optimized_plan.root.has_mutations();
3380        if has_mutations {
3381            self.require_permission(crate::auth::StatementKind::Write)?;
3382        }
3383
3384        let result = self.with_auto_commit(has_mutations, || {
3385            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3386            let planner =
3387                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3388            let mut physical_plan = planner.plan(&optimized_plan)?;
3389            let executor = self.make_executor(physical_plan.columns.clone());
3390            executor.execute(physical_plan.operator.as_mut())
3391        });
3392
3393        #[cfg(feature = "metrics")]
3394        {
3395            #[cfg(not(target_arch = "wasm32"))]
3396            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3397            #[cfg(target_arch = "wasm32")]
3398            let elapsed_ms = None;
3399            self.record_query_metrics("gremlin", elapsed_ms, &result);
3400        }
3401
3402        result
3403    }
3404
3405    /// Executes a GraphQL query against the LPG store.
3406    ///
3407    /// # Errors
3408    ///
3409    /// Returns an error if the query fails to parse or execute.
3410    ///
3411    /// # Examples
3412    ///
3413    /// ```no_run
3414    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3415    /// use grafeo_engine::GrafeoDB;
3416    ///
3417    /// let db = GrafeoDB::new_in_memory();
3418    /// let session = db.session();
3419    ///
3420    /// // Create some nodes first
3421    /// session.create_node(&["User"]);
3422    ///
3423    /// // Query using GraphQL
3424    /// let result = session.execute_graphql("query { user { id name } }")?;
3425    /// # Ok(())
3426    /// # }
3427    /// ```
3428    #[cfg(feature = "graphql")]
3429    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3430        use crate::query::{
3431            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3432            translators::graphql,
3433        };
3434
3435        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3436        let start_time = Instant::now();
3437
3438        let mut logical_plan = graphql::translate(query)?;
3439
3440        // Substitute default parameter values from variable declarations
3441        if !logical_plan.default_params.is_empty() {
3442            let defaults = logical_plan.default_params.clone();
3443            substitute_params(&mut logical_plan, &defaults)?;
3444        }
3445
3446        let mut binder = Binder::new();
3447        let _binding_context = binder.bind(&logical_plan)?;
3448
3449        let active = self.active_store();
3450        let optimizer = Optimizer::from_graph_store(&*active);
3451        let optimized_plan = optimizer.optimize(logical_plan)?;
3452        let has_mutations = optimized_plan.root.has_mutations();
3453        if has_mutations {
3454            self.require_permission(crate::auth::StatementKind::Write)?;
3455        }
3456
3457        let result = self.with_auto_commit(has_mutations, || {
3458            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3459            let planner =
3460                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3461            let mut physical_plan = planner.plan(&optimized_plan)?;
3462            let executor = self.make_executor(physical_plan.columns.clone());
3463            executor.execute(physical_plan.operator.as_mut())
3464        });
3465
3466        #[cfg(feature = "metrics")]
3467        {
3468            #[cfg(not(target_arch = "wasm32"))]
3469            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3470            #[cfg(target_arch = "wasm32")]
3471            let elapsed_ms = None;
3472            self.record_query_metrics("graphql", elapsed_ms, &result);
3473        }
3474
3475        result
3476    }
3477
3478    /// Executes a GraphQL query with parameters.
3479    ///
3480    /// # Errors
3481    ///
3482    /// Returns an error if the query fails to parse or execute.
3483    #[cfg(feature = "graphql")]
3484    pub fn execute_graphql_with_params(
3485        &self,
3486        query: &str,
3487        params: std::collections::HashMap<String, Value>,
3488    ) -> Result<QueryResult> {
3489        use crate::query::{
3490            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3491            translators::graphql,
3492        };
3493
3494        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3495        let start_time = Instant::now();
3496
3497        // Parse and translate the query to a logical plan
3498        let mut logical_plan = graphql::translate(query)?;
3499
3500        // Merge default params with caller-supplied params
3501        if !logical_plan.default_params.is_empty() {
3502            let mut merged = logical_plan.default_params.clone();
3503            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3504            substitute_params(&mut logical_plan, &merged)?;
3505        } else {
3506            substitute_params(&mut logical_plan, &params)?;
3507        }
3508
3509        // Semantic validation
3510        let mut binder = Binder::new();
3511        let _binding_context = binder.bind(&logical_plan)?;
3512
3513        // Optimize the plan
3514        let active = self.active_store();
3515        let optimizer = Optimizer::from_graph_store(&*active);
3516        let optimized_plan = optimizer.optimize(logical_plan)?;
3517
3518        let has_mutations = optimized_plan.root.has_mutations();
3519        if has_mutations {
3520            self.require_permission(crate::auth::StatementKind::Write)?;
3521        }
3522
3523        let result = self.with_auto_commit(has_mutations, || {
3524            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3525            let planner =
3526                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3527            let mut physical_plan = planner.plan(&optimized_plan)?;
3528            let executor = self.make_executor(physical_plan.columns.clone());
3529            executor.execute(physical_plan.operator.as_mut())
3530        });
3531
3532        #[cfg(feature = "metrics")]
3533        {
3534            #[cfg(not(target_arch = "wasm32"))]
3535            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3536            #[cfg(target_arch = "wasm32")]
3537            let elapsed_ms = None;
3538            self.record_query_metrics("graphql", elapsed_ms, &result);
3539        }
3540
3541        result
3542    }
3543
3544    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3545    ///
3546    /// # Errors
3547    ///
3548    /// Returns an error if the query fails to parse or execute.
3549    ///
3550    /// # Examples
3551    ///
3552    /// ```no_run
3553    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3554    /// use grafeo_engine::GrafeoDB;
3555    ///
3556    /// let db = GrafeoDB::new_in_memory();
3557    /// let session = db.session();
3558    ///
3559    /// let result = session.execute_sql(
3560    ///     "SELECT * FROM GRAPH_TABLE (
3561    ///         MATCH (n:Person)
3562    ///         COLUMNS (n.name AS name)
3563    ///     )"
3564    /// )?;
3565    /// # Ok(())
3566    /// # }
3567    /// ```
3568    #[cfg(feature = "sql-pgq")]
3569    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3570        use crate::query::{
3571            binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3572            processor::QueryLanguage, translators::sql_pgq,
3573        };
3574
3575        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3576        let start_time = Instant::now();
3577
3578        // Parse and translate (always needed to check for DDL)
3579        let logical_plan = sql_pgq::translate(query)?;
3580
3581        // Handle DDL statements directly (they don't go through the query pipeline)
3582        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3583            self.require_permission(crate::auth::StatementKind::Admin)?;
3584            return Ok(QueryResult {
3585                columns: vec!["status".into()],
3586                column_types: vec![grafeo_common::types::LogicalType::String],
3587                rows: vec![vec![Value::from(format!(
3588                    "Property graph '{}' created ({} node tables, {} edge tables)",
3589                    cpg.name,
3590                    cpg.node_tables.len(),
3591                    cpg.edge_tables.len()
3592                ))]],
3593                execution_time_ms: None,
3594                rows_scanned: None,
3595                status_message: None,
3596                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3597            });
3598        }
3599
3600        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3601
3602        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3603            cached_plan
3604        } else {
3605            let mut binder = Binder::new();
3606            let _binding_context = binder.bind(&logical_plan)?;
3607            let active = self.active_store();
3608            let optimizer = Optimizer::from_graph_store(&*active);
3609            let plan = optimizer.optimize(logical_plan)?;
3610            self.query_cache.put_optimized(cache_key, plan.clone());
3611            plan
3612        };
3613
3614        let active = self.active_store();
3615        let has_mutations = optimized_plan.root.has_mutations();
3616        if has_mutations {
3617            self.require_permission(crate::auth::StatementKind::Write)?;
3618        }
3619
3620        let result = self.with_auto_commit(has_mutations, || {
3621            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3622            let planner =
3623                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3624            let mut physical_plan = planner.plan(&optimized_plan)?;
3625            let executor = self.make_executor(physical_plan.columns.clone());
3626            executor.execute(physical_plan.operator.as_mut())
3627        });
3628
3629        #[cfg(feature = "metrics")]
3630        {
3631            #[cfg(not(target_arch = "wasm32"))]
3632            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3633            #[cfg(target_arch = "wasm32")]
3634            let elapsed_ms = None;
3635            self.record_query_metrics("sql", elapsed_ms, &result);
3636        }
3637
3638        result
3639    }
3640
3641    /// Executes a SQL/PGQ query with parameters.
3642    ///
3643    /// # Errors
3644    ///
3645    /// Returns an error if the query fails to parse or execute.
3646    #[cfg(feature = "sql-pgq")]
3647    pub fn execute_sql_with_params(
3648        &self,
3649        query: &str,
3650        params: std::collections::HashMap<String, Value>,
3651    ) -> Result<QueryResult> {
3652        use crate::query::processor::{QueryLanguage, QueryProcessor};
3653
3654        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3655        let start_time = Instant::now();
3656
3657        let has_mutations = if self.identity.can_write() {
3658            Self::query_looks_like_mutation(query)
3659        } else {
3660            use crate::query::translators::sql_pgq;
3661            match sql_pgq::translate(query) {
3662                Ok(plan) if plan.root.has_mutations() => {
3663                    self.require_permission(crate::auth::StatementKind::Write)?;
3664                    true
3665                }
3666                Ok(_) => false,
3667                Err(_) => Self::query_looks_like_mutation(query),
3668            }
3669        };
3670        if has_mutations {
3671            self.require_permission(crate::auth::StatementKind::Write)?;
3672        }
3673        let active = self.active_store();
3674
3675        let result = self.with_auto_commit(has_mutations, || {
3676            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3677            let processor = QueryProcessor::for_stores_with_transaction(
3678                Arc::clone(&active),
3679                self.active_write_store(),
3680                Arc::clone(&self.transaction_manager),
3681            )?;
3682            let processor = if let Some(transaction_id) = transaction_id {
3683                processor.with_transaction_context(viewing_epoch, transaction_id)
3684            } else {
3685                processor
3686            };
3687            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3688        });
3689
3690        #[cfg(feature = "metrics")]
3691        {
3692            #[cfg(not(target_arch = "wasm32"))]
3693            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3694            #[cfg(target_arch = "wasm32")]
3695            let elapsed_ms = None;
3696            self.record_query_metrics("sql", elapsed_ms, &result);
3697        }
3698
3699        result
3700    }
3701
3702    /// Executes a query in the specified language by name.
3703    ///
3704    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3705    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3706    ///
3707    /// # Errors
3708    ///
3709    /// Returns an error if the language is unknown/disabled or the query fails.
3710    pub fn execute_language(
3711        &self,
3712        query: &str,
3713        language: &str,
3714        params: Option<std::collections::HashMap<String, Value>>,
3715    ) -> Result<QueryResult> {
3716        let _span = grafeo_info_span!(
3717            "grafeo::session::execute",
3718            language,
3719            query_len = query.len(),
3720        );
3721        match language {
3722            "gql" => {
3723                if let Some(p) = params {
3724                    self.execute_with_params(query, p)
3725                } else {
3726                    self.execute(query)
3727                }
3728            }
3729            #[cfg(feature = "cypher")]
3730            "cypher" => {
3731                if let Some(p) = params {
3732                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3733
3734                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3735                    let start_time = Instant::now();
3736
3737                    let has_mutations = if self.identity.can_write() {
3738                        Self::query_looks_like_mutation(query)
3739                    } else {
3740                        use crate::query::translators::cypher;
3741                        match cypher::translate(query) {
3742                            Ok(plan) if plan.root.has_mutations() => {
3743                                self.require_permission(crate::auth::StatementKind::Write)?;
3744                                true
3745                            }
3746                            Ok(_) => false,
3747                            Err(_) => Self::query_looks_like_mutation(query),
3748                        }
3749                    };
3750                    let active = self.active_store();
3751                    let result = self.with_auto_commit(has_mutations, || {
3752                        let processor = QueryProcessor::for_stores_with_transaction(
3753                            Arc::clone(&active),
3754                            self.active_write_store(),
3755                            Arc::clone(&self.transaction_manager),
3756                        )?;
3757                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3758                        let processor = if let Some(transaction_id) = transaction_id {
3759                            processor.with_transaction_context(viewing_epoch, transaction_id)
3760                        } else {
3761                            processor
3762                        };
3763                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3764                    });
3765
3766                    #[cfg(feature = "metrics")]
3767                    {
3768                        #[cfg(not(target_arch = "wasm32"))]
3769                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3770                        #[cfg(target_arch = "wasm32")]
3771                        let elapsed_ms = None;
3772                        self.record_query_metrics("cypher", elapsed_ms, &result);
3773                    }
3774
3775                    result
3776                } else {
3777                    self.execute_cypher(query)
3778                }
3779            }
3780            #[cfg(feature = "gremlin")]
3781            "gremlin" => {
3782                if let Some(p) = params {
3783                    self.execute_gremlin_with_params(query, p)
3784                } else {
3785                    self.execute_gremlin(query)
3786                }
3787            }
3788            #[cfg(feature = "graphql")]
3789            "graphql" => {
3790                if let Some(p) = params {
3791                    self.execute_graphql_with_params(query, p)
3792                } else {
3793                    self.execute_graphql(query)
3794                }
3795            }
3796            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3797            "graphql-rdf" => {
3798                if let Some(p) = params {
3799                    self.execute_graphql_rdf_with_params(query, p)
3800                } else {
3801                    self.execute_graphql_rdf(query)
3802                }
3803            }
3804            #[cfg(feature = "sql-pgq")]
3805            "sql" | "sql-pgq" => {
3806                if let Some(p) = params {
3807                    self.execute_sql_with_params(query, p)
3808                } else {
3809                    self.execute_sql(query)
3810                }
3811            }
3812            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3813            "sparql" => {
3814                if let Some(p) = params {
3815                    self.execute_sparql_with_params(query, p)
3816                } else {
3817                    self.execute_sparql(query)
3818                }
3819            }
3820            other => Err(grafeo_common::utils::error::Error::Query(
3821                grafeo_common::utils::error::QueryError::new(
3822                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3823                    format!("Unknown query language: '{other}'"),
3824                ),
3825            )),
3826        }
3827    }
3828
3829    /// Begins a new transaction.
3830    ///
3831    /// # Errors
3832    ///
3833    /// Returns an error if a transaction is already active.
3834    ///
3835    /// # Examples
3836    ///
3837    /// ```no_run
3838    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3839    /// use grafeo_engine::GrafeoDB;
3840    ///
3841    /// let db = GrafeoDB::new_in_memory();
3842    /// let mut session = db.session();
3843    ///
3844    /// session.begin_transaction()?;
3845    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3846    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3847    /// session.commit()?; // Both inserts committed atomically
3848    /// # Ok(())
3849    /// # }
3850    /// ```
3851    /// Clears all cached query plans.
3852    ///
3853    /// The plan cache is shared across all sessions on the same database,
3854    /// so clearing from one session affects all sessions.
3855    pub fn clear_plan_cache(&self) {
3856        self.query_cache.clear();
3857    }
3858
3859    /// Begins a new transaction on this session.
3860    ///
3861    /// Uses the default isolation level (`SnapshotIsolation`).
3862    ///
3863    /// # Errors
3864    ///
3865    /// Returns an error if a transaction is already active.
3866    #[cfg(feature = "lpg")]
3867    pub fn begin_transaction(&mut self) -> Result<()> {
3868        self.begin_transaction_inner(false, None)
3869    }
3870
3871    /// Begins a transaction with a specific isolation level.
3872    ///
3873    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3874    ///
3875    /// # Errors
3876    ///
3877    /// Returns an error if a transaction is already active.
3878    #[cfg(feature = "lpg")]
3879    pub fn begin_transaction_with_isolation(
3880        &mut self,
3881        isolation_level: crate::transaction::IsolationLevel,
3882    ) -> Result<()> {
3883        self.begin_transaction_inner(false, Some(isolation_level))
3884    }
3885
3886    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3887    #[cfg(feature = "lpg")]
3888    fn begin_transaction_inner(
3889        &self,
3890        read_only: bool,
3891        isolation_level: Option<crate::transaction::IsolationLevel>,
3892    ) -> Result<()> {
3893        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3894        let mut current = self.current_transaction.lock();
3895        if current.is_some() {
3896            // Nested transaction: create an auto-savepoint instead of a new tx.
3897            drop(current);
3898            let mut depth = self.transaction_nesting_depth.lock();
3899            *depth += 1;
3900            let sp_name = format!("_nested_tx_{}", *depth);
3901            self.savepoint(&sp_name)?;
3902            return Ok(());
3903        }
3904
3905        let active = self.active_lpg_store();
3906        self.transaction_start_node_count
3907            .store(active.node_count(), Ordering::Relaxed);
3908        self.transaction_start_edge_count
3909            .store(active.edge_count(), Ordering::Relaxed);
3910        let transaction_id = if let Some(level) = isolation_level {
3911            self.transaction_manager.begin_with_isolation(level)
3912        } else {
3913            self.transaction_manager.begin()
3914        };
3915        *current = Some(transaction_id);
3916        *self.read_only_tx.lock() = read_only || self.db_read_only;
3917
3918        // Record the initial graph as "touched" for cross-graph atomicity.
3919        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3920        let key = self.active_graph_storage_key();
3921        let mut touched = self.touched_graphs.lock();
3922        touched.clear();
3923        touched.push(key);
3924
3925        #[cfg(feature = "metrics")]
3926        {
3927            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3928            #[cfg(not(target_arch = "wasm32"))]
3929            {
3930                *self.tx_start_time.lock() = Some(Instant::now());
3931            }
3932        }
3933
3934        Ok(())
3935    }
3936
3937    /// Commits the current transaction.
3938    ///
3939    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3940    ///
3941    /// # Errors
3942    ///
3943    /// Returns an error if no transaction is active.
3944    #[cfg(feature = "lpg")]
3945    pub fn commit(&mut self) -> Result<()> {
3946        self.commit_inner()
3947    }
3948
3949    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3950    #[cfg(feature = "lpg")]
3951    fn commit_inner(&self) -> Result<()> {
3952        let _span = grafeo_debug_span!("grafeo::tx::commit");
3953
3954        #[cfg(feature = "testing-statement-injection")]
3955        if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3956            // Commit fails before any state is finalized. Treat it like any
3957            // other pre-prepare commit failure and auto-rollback so the
3958            // session returns to a clean, consistent state (matches real
3959            // DB semantics: commit failure implies the transaction is
3960            // aborted, not left in-flight).
3961            let _ = self.rollback_inner();
3962            return Err(grafeo_common::utils::error::Error::Internal(format!(
3963                "injected commit failure: {e}"
3964            )));
3965        }
3966
3967        self.check_no_active_streams("commit")?;
3968        // Nested transaction: release the auto-savepoint (changes are preserved).
3969        {
3970            let mut depth = self.transaction_nesting_depth.lock();
3971            if *depth > 0 {
3972                let sp_name = format!("_nested_tx_{depth}");
3973                *depth -= 1;
3974                drop(depth);
3975                return self.release_savepoint(&sp_name);
3976            }
3977        }
3978
3979        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3980            grafeo_common::utils::error::Error::Transaction(
3981                grafeo_common::utils::error::TransactionError::InvalidState(
3982                    "No active transaction".to_string(),
3983                ),
3984            )
3985        })?;
3986
3987        // Validate the transaction first (conflict detection) before committing data.
3988        // If this fails, we rollback the data changes instead of making them permanent.
3989        //
3990        // Take ownership of the touched graphs in one lock acquisition. Since
3991        // current_transaction was .take()'d above, no concurrent thread can call
3992        // track_graph_touch() for this transaction (it checks current_transaction
3993        // first), so this is safe.
3994        let touched = std::mem::take(&mut *self.touched_graphs.lock());
3995        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3996            Ok(epoch) => epoch,
3997            Err(e) => {
3998                // Conflict detected: rollback the data changes
3999                for graph_name in &touched {
4000                    let store = self.resolve_store(graph_name);
4001                    store.rollback_transaction_properties(transaction_id);
4002                }
4003                #[cfg(feature = "triple-store")]
4004                self.rollback_rdf_transaction(transaction_id);
4005                // Discard buffered CDC events on conflict rollback
4006                #[cfg(feature = "cdc")]
4007                if let Some(ref pending) = self.cdc_pending_events {
4008                    pending.lock().clear();
4009                }
4010                *self.read_only_tx.lock() = self.db_read_only;
4011                self.savepoints.lock().clear();
4012                self.touched_graphs.lock().clear();
4013                #[cfg(feature = "metrics")]
4014                {
4015                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
4016                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
4017                    #[cfg(not(target_arch = "wasm32"))]
4018                    if let Some(start) = self.tx_start_time.lock().take() {
4019                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4020                        crate::metrics::record_metric!(
4021                            self.metrics,
4022                            tx_duration,
4023                            observe duration_ms
4024                        );
4025                    }
4026                }
4027                return Err(e);
4028            }
4029        };
4030
4031        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
4032        for graph_name in &touched {
4033            let store = self.resolve_store(graph_name);
4034            store.finalize_version_epochs(transaction_id, commit_epoch);
4035        }
4036
4037        // Commit succeeded: discard undo logs (make changes permanent)
4038        #[cfg(feature = "triple-store")]
4039        self.commit_rdf_transaction(transaction_id);
4040
4041        for graph_name in &touched {
4042            let store = self.resolve_store(graph_name);
4043            store.commit_transaction_properties(transaction_id);
4044        }
4045
4046        // Flush buffered CDC events now that the transaction is committed.
4047        // All buffered events have PENDING epoch; assign the real commit_epoch.
4048        // Uses record_batch to acquire the write lock once per commit.
4049        #[cfg(feature = "cdc")]
4050        if let Some(ref pending) = self.cdc_pending_events {
4051            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
4052            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
4053                e.epoch = commit_epoch;
4054                e
4055            }));
4056        }
4057
4058        // Log transaction commit and epoch advance to WAL so that crash
4059        // recovery can identify committed transactions and their epoch
4060        // boundaries. Without these markers, WAL recovery discards all
4061        // records as uncommitted (fixes #252 for the crash scenario).
4062        #[cfg(feature = "wal")]
4063        if let Some(ref wal) = self.wal {
4064            use grafeo_storage::wal::WalRecord;
4065            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4066                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4067            }
4068            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4069                epoch: commit_epoch,
4070            }) {
4071                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4072            }
4073        }
4074
4075        // Sync epoch for all touched graphs so that convenience lookups
4076        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
4077        let current_epoch = self.transaction_manager.current_epoch();
4078        for graph_name in &touched {
4079            let store = self.resolve_store(graph_name);
4080            store.sync_epoch(current_epoch);
4081        }
4082
4083        // Reset read-only flag and clear savepoints.
4084        // touched_graphs was already emptied by mem::take above.
4085        *self.read_only_tx.lock() = self.db_read_only;
4086        self.savepoints.lock().clear();
4087
4088        // Auto-GC: periodically prune old MVCC versions
4089        if self.gc_interval > 0 {
4090            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4091            if count.is_multiple_of(self.gc_interval) {
4092                #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4093                let gc_start = std::time::Instant::now();
4094
4095                let min_epoch = self.transaction_manager.min_active_epoch();
4096                for graph_name in &touched {
4097                    let store = self.resolve_store(graph_name);
4098                    store.gc_versions(min_epoch);
4099                }
4100                self.transaction_manager.gc();
4101
4102                #[cfg(feature = "metrics")]
4103                {
4104                    crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4105                    #[cfg(not(target_arch = "wasm32"))]
4106                    {
4107                        let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4108                        crate::metrics::record_metric!(
4109                            self.metrics,
4110                            gc_duration,
4111                            observe gc_duration_ms
4112                        );
4113                    }
4114                }
4115            }
4116        }
4117
4118        #[cfg(feature = "metrics")]
4119        {
4120            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4121            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4122            #[cfg(not(target_arch = "wasm32"))]
4123            if let Some(start) = self.tx_start_time.lock().take() {
4124                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4125                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4126            }
4127        }
4128
4129        Ok(())
4130    }
4131
4132    /// Aborts the current transaction.
4133    ///
4134    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
4135    ///
4136    /// # Errors
4137    ///
4138    /// Returns an error if no transaction is active.
4139    ///
4140    /// # Examples
4141    ///
4142    /// ```no_run
4143    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4144    /// use grafeo_engine::GrafeoDB;
4145    ///
4146    /// let db = GrafeoDB::new_in_memory();
4147    /// let mut session = db.session();
4148    ///
4149    /// session.begin_transaction()?;
4150    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4151    /// session.rollback()?; // Insert is discarded
4152    /// # Ok(())
4153    /// # }
4154    /// ```
4155    #[cfg(feature = "lpg")]
4156    pub fn rollback(&mut self) -> Result<()> {
4157        self.rollback_inner()
4158    }
4159
4160    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
4161    #[cfg(feature = "lpg")]
4162    fn rollback_inner(&self) -> Result<()> {
4163        let _span = grafeo_debug_span!("grafeo::tx::rollback");
4164        self.check_no_active_streams("rollback")?;
4165        // Nested transaction: rollback to the auto-savepoint.
4166        {
4167            let mut depth = self.transaction_nesting_depth.lock();
4168            if *depth > 0 {
4169                let sp_name = format!("_nested_tx_{depth}");
4170                *depth -= 1;
4171                drop(depth);
4172                return self.rollback_to_savepoint(&sp_name);
4173            }
4174        }
4175
4176        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4177            grafeo_common::utils::error::Error::Transaction(
4178                grafeo_common::utils::error::TransactionError::InvalidState(
4179                    "No active transaction".to_string(),
4180                ),
4181            )
4182        })?;
4183
4184        // Reset read-only flag
4185        *self.read_only_tx.lock() = self.db_read_only;
4186
4187        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
4188        let touched = self.touched_graphs.lock().clone();
4189        for graph_name in &touched {
4190            let store = self.resolve_store(graph_name);
4191            store.discard_uncommitted_versions(transaction_id);
4192        }
4193
4194        // Discard pending operations in the RDF store
4195        #[cfg(feature = "triple-store")]
4196        self.rollback_rdf_transaction(transaction_id);
4197
4198        // Discard buffered CDC events on rollback
4199        #[cfg(feature = "cdc")]
4200        if let Some(ref pending) = self.cdc_pending_events {
4201            pending.lock().clear();
4202        }
4203
4204        // Clear savepoints and touched graphs
4205        self.savepoints.lock().clear();
4206        self.touched_graphs.lock().clear();
4207
4208        // Mark transaction as aborted in the manager
4209        let result = self.transaction_manager.abort(transaction_id);
4210
4211        #[cfg(feature = "metrics")]
4212        if result.is_ok() {
4213            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4214            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4215            #[cfg(not(target_arch = "wasm32"))]
4216            if let Some(start) = self.tx_start_time.lock().take() {
4217                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4218                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4219            }
4220        }
4221
4222        result
4223    }
4224
4225    /// Creates a named savepoint within the current transaction.
4226    ///
4227    /// The savepoint captures the current node/edge ID counters so that
4228    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
4229    /// entities created after this point.
4230    ///
4231    /// # Errors
4232    ///
4233    /// Returns an error if no transaction is active.
4234    #[cfg(feature = "lpg")]
4235    pub fn savepoint(&self, name: &str) -> Result<()> {
4236        let tx_id = self.current_transaction.lock().ok_or_else(|| {
4237            grafeo_common::utils::error::Error::Transaction(
4238                grafeo_common::utils::error::TransactionError::InvalidState(
4239                    "No active transaction".to_string(),
4240                ),
4241            )
4242        })?;
4243
4244        // Capture state for every graph touched so far.
4245        let touched = self.touched_graphs.lock().clone();
4246        let graph_snapshots: Vec<GraphSavepoint> = touched
4247            .iter()
4248            .map(|graph_name| {
4249                let store = self.resolve_store(graph_name);
4250                GraphSavepoint {
4251                    graph_name: graph_name.clone(),
4252                    next_node_id: store.peek_next_node_id(),
4253                    next_edge_id: store.peek_next_edge_id(),
4254                    undo_log_position: store.property_undo_log_position(tx_id),
4255                }
4256            })
4257            .collect();
4258
4259        self.savepoints.lock().push(SavepointState {
4260            name: name.to_string(),
4261            graph_snapshots,
4262            active_graph: self.current_graph.lock().clone(),
4263            #[cfg(feature = "cdc")]
4264            cdc_event_position: self
4265                .cdc_pending_events
4266                .as_ref()
4267                .map_or(0, |p| p.lock().len()),
4268        });
4269        Ok(())
4270    }
4271
4272    /// Rolls back to a named savepoint, undoing all writes made after it.
4273    ///
4274    /// The savepoint and any savepoints created after it are removed.
4275    /// Entities with IDs >= the savepoint snapshot are discarded.
4276    ///
4277    /// # Errors
4278    ///
4279    /// Returns an error if no transaction is active or the savepoint does not exist.
4280    #[cfg(feature = "lpg")]
4281    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4282        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4283            grafeo_common::utils::error::Error::Transaction(
4284                grafeo_common::utils::error::TransactionError::InvalidState(
4285                    "No active transaction".to_string(),
4286                ),
4287            )
4288        })?;
4289
4290        let mut savepoints = self.savepoints.lock();
4291
4292        // Find the savepoint by name (search from the end for nested savepoints)
4293        let pos = savepoints
4294            .iter()
4295            .rposition(|sp| sp.name == name)
4296            .ok_or_else(|| {
4297                grafeo_common::utils::error::Error::Transaction(
4298                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4299                        "Savepoint '{name}' not found"
4300                    )),
4301                )
4302            })?;
4303
4304        let sp_state = savepoints[pos].clone();
4305
4306        // Remove this savepoint and all later ones
4307        savepoints.truncate(pos);
4308        drop(savepoints);
4309
4310        // Roll back each graph that was captured in the savepoint.
4311        for gs in &sp_state.graph_snapshots {
4312            let store = self.resolve_store(&gs.graph_name);
4313
4314            // Replay property/label undo entries recorded after the savepoint
4315            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4316
4317            // Discard entities created after the savepoint
4318            let current_next_node = store.peek_next_node_id();
4319            let current_next_edge = store.peek_next_edge_id();
4320
4321            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4322                .map(NodeId::new)
4323                .collect();
4324            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4325                .map(EdgeId::new)
4326                .collect();
4327
4328            if !node_ids.is_empty() || !edge_ids.is_empty() {
4329                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4330            }
4331        }
4332
4333        // Also roll back any graphs that were touched AFTER the savepoint
4334        // but not captured in it. These need full discard since the savepoint
4335        // didn't include them.
4336        let touched = self.touched_graphs.lock().clone();
4337        for graph_name in &touched {
4338            let already_captured = sp_state
4339                .graph_snapshots
4340                .iter()
4341                .any(|gs| gs.graph_name == *graph_name);
4342            if !already_captured {
4343                let store = self.resolve_store(graph_name);
4344                store.discard_uncommitted_versions(transaction_id);
4345            }
4346        }
4347
4348        // Truncate CDC event buffer to the savepoint position.
4349        #[cfg(feature = "cdc")]
4350        if let Some(ref pending) = self.cdc_pending_events {
4351            pending.lock().truncate(sp_state.cdc_event_position);
4352        }
4353
4354        // Restore touched_graphs to only the graphs that were known at savepoint time.
4355        let mut touched = self.touched_graphs.lock();
4356        touched.clear();
4357        for gs in &sp_state.graph_snapshots {
4358            if !touched.contains(&gs.graph_name) {
4359                touched.push(gs.graph_name.clone());
4360            }
4361        }
4362
4363        Ok(())
4364    }
4365
4366    /// Releases (removes) a named savepoint without rolling back.
4367    ///
4368    /// # Errors
4369    ///
4370    /// Returns an error if no transaction is active or the savepoint does not exist.
4371    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4372        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4373            grafeo_common::utils::error::Error::Transaction(
4374                grafeo_common::utils::error::TransactionError::InvalidState(
4375                    "No active transaction".to_string(),
4376                ),
4377            )
4378        })?;
4379
4380        let mut savepoints = self.savepoints.lock();
4381        let pos = savepoints
4382            .iter()
4383            .rposition(|sp| sp.name == name)
4384            .ok_or_else(|| {
4385                grafeo_common::utils::error::Error::Transaction(
4386                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4387                        "Savepoint '{name}' not found"
4388                    )),
4389                )
4390            })?;
4391        savepoints.remove(pos);
4392        Ok(())
4393    }
4394
4395    /// Returns whether a transaction is active.
4396    #[must_use]
4397    pub fn in_transaction(&self) -> bool {
4398        self.current_transaction.lock().is_some()
4399    }
4400
4401    /// Returns the current transaction ID, if any.
4402    #[must_use]
4403    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4404        *self.current_transaction.lock()
4405    }
4406
4407    /// Returns a reference to the transaction manager.
4408    #[must_use]
4409    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4410        &self.transaction_manager
4411    }
4412
4413    /// Returns the store's current node count and the count at transaction start.
4414    #[cfg(feature = "lpg")]
4415    #[must_use]
4416    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4417        (
4418            self.transaction_start_node_count.load(Ordering::Relaxed),
4419            self.active_lpg_store().node_count(),
4420        )
4421    }
4422
4423    /// Returns the store's current edge count and the count at transaction start.
4424    #[cfg(feature = "lpg")]
4425    #[must_use]
4426    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4427        (
4428            self.transaction_start_edge_count.load(Ordering::Relaxed),
4429            self.active_lpg_store().edge_count(),
4430        )
4431    }
4432
4433    /// Prepares the current transaction for a two-phase commit.
4434    ///
4435    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4436    /// lets you inspect pending changes and attach metadata before finalizing.
4437    /// The mutable borrow prevents concurrent operations while the commit is
4438    /// pending.
4439    ///
4440    /// If the `PreparedCommit` is dropped without calling `commit()` or
4441    /// `abort()`, the transaction is automatically rolled back.
4442    ///
4443    /// # Errors
4444    ///
4445    /// Returns an error if no transaction is active.
4446    ///
4447    /// # Examples
4448    ///
4449    /// ```no_run
4450    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4451    /// use grafeo_engine::GrafeoDB;
4452    ///
4453    /// let db = GrafeoDB::new_in_memory();
4454    /// let mut session = db.session();
4455    ///
4456    /// session.begin_transaction()?;
4457    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4458    ///
4459    /// let mut prepared = session.prepare_commit()?;
4460    /// println!("Nodes written: {}", prepared.info().nodes_written);
4461    /// prepared.set_metadata("audit_user", "admin");
4462    /// prepared.commit()?;
4463    /// # Ok(())
4464    /// # }
4465    /// ```
4466    #[cfg(feature = "lpg")]
4467    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4468        crate::transaction::PreparedCommit::new(self)
4469    }
4470
4471    /// Sets auto-commit mode.
4472    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4473        self.auto_commit = auto_commit;
4474    }
4475
4476    /// Returns whether auto-commit is enabled.
4477    #[must_use]
4478    pub fn auto_commit(&self) -> bool {
4479        self.auto_commit
4480    }
4481
4482    /// Returns `true` if auto-commit should wrap this execution.
4483    ///
4484    /// Auto-commit kicks in when: the session is in auto-commit mode,
4485    /// no explicit transaction is active, and the query mutates data.
4486    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4487        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4488    }
4489
4490    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4491    /// returns `true`. On error the transaction is rolled back.
4492    #[cfg(feature = "lpg")]
4493    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4494    where
4495        F: FnOnce() -> Result<QueryResult>,
4496    {
4497        if self.needs_auto_commit(has_mutations) {
4498            self.begin_transaction_inner(false, None)?;
4499            match body() {
4500                Ok(result) => {
4501                    self.commit_inner()?;
4502                    Ok(result)
4503                }
4504                Err(e) => {
4505                    let _ = self.rollback_inner();
4506                    Err(e)
4507                }
4508            }
4509        } else {
4510            body()
4511        }
4512    }
4513
4514    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4515    #[cfg(not(feature = "lpg"))]
4516    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4517    where
4518        F: FnOnce() -> Result<QueryResult>,
4519    {
4520        body()
4521    }
4522
4523    /// Quick heuristic: returns `true` when the query text looks like it
4524    /// performs a mutation. Used by `_with_params` paths that go through the
4525    /// `QueryProcessor` (where the logical plan isn't available before
4526    /// execution). False negatives are harmless: the data just won't be
4527    /// auto-committed, which matches the prior behaviour.
4528    fn query_looks_like_mutation(query: &str) -> bool {
4529        let upper = query.to_ascii_uppercase();
4530        upper.contains("INSERT")
4531            || upper.contains("CREATE")
4532            || upper.contains("DELETE")
4533            || upper.contains("MERGE")
4534            || upper.contains("SET")
4535            || upper.contains("REMOVE")
4536            || upper.contains("DROP")
4537            || upper.contains("ALTER")
4538    }
4539
4540    /// Returns `Err(Transaction(InvalidState))` if any `ResultStream` is
4541    /// still outstanding for this session.
4542    #[cfg(feature = "lpg")]
4543    fn check_no_active_streams(&self, op: &str) -> Result<()> {
4544        if self.active_streams.load(Ordering::Acquire) > 0 {
4545            return Err(grafeo_common::utils::error::Error::Transaction(
4546                grafeo_common::utils::error::TransactionError::InvalidState(format!(
4547                    "Cannot {op} while streaming results are active; drop the stream first"
4548                )),
4549            ));
4550        }
4551        Ok(())
4552    }
4553
4554    /// Computes the wall-clock deadline for query execution.
4555    #[must_use]
4556    fn query_deadline(&self) -> Option<Instant> {
4557        #[cfg(not(target_arch = "wasm32"))]
4558        {
4559            self.query_timeout.map(|d| Instant::now() + d)
4560        }
4561        #[cfg(target_arch = "wasm32")]
4562        {
4563            let _ = &self.query_timeout;
4564            None
4565        }
4566    }
4567
4568    /// Creates an executor with deadline and timeout duration configured.
4569    fn make_executor(&self, columns: Vec<String>) -> Executor {
4570        Executor::with_columns(columns)
4571            .with_deadline(self.query_deadline())
4572            .with_timeout_duration(self.query_timeout)
4573    }
4574
4575    /// Creates a per-query `OperatorMemoryContext` for memory-aware spilling.
4576    ///
4577    /// Returns `None` if the buffer manager is not configured or has no spill path,
4578    /// in which case operators will use row-count fallback thresholds.
4579    #[cfg(feature = "spill")]
4580    fn make_operator_memory_context(
4581        &self,
4582    ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4583        let bm = self.buffer_manager.as_ref()?;
4584        let spill_path = bm.config().spill_path.as_ref()?;
4585        // Per-query isolation: create a unique subdirectory
4586        let query_id = self
4587            .commit_counter
4588            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4589        let query_dir = spill_path.join(format!("query_{query_id}"));
4590        let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4591        Some(grafeo_core::execution::OperatorMemoryContext::new(
4592            std::sync::Arc::clone(bm),
4593            sm,
4594        ))
4595    }
4596
4597    /// Checks that a property value does not exceed the configured size limit.
4598    fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4599        if let Some(limit) = self.max_property_size {
4600            let size = value.estimated_size_bytes();
4601            if size > limit {
4602                let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4603                    format!("{} MiB", limit / (1024 * 1024))
4604                } else if limit >= 1024 && limit % 1024 == 0 {
4605                    format!("{} KiB", limit / 1024)
4606                } else {
4607                    format!("{limit} bytes")
4608                };
4609                return Err(grafeo_common::utils::error::Error::Query(
4610                    grafeo_common::utils::error::QueryError::new(
4611                        grafeo_common::utils::error::QueryErrorKind::Execution,
4612                        format!(
4613                            "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4614                        ),
4615                    )
4616                    .with_hint(
4617                        "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4618                    ),
4619                ));
4620            }
4621        }
4622        Ok(())
4623    }
4624
4625    /// Records query metrics for any language.
4626    ///
4627    /// Called after query execution to update counters, latency histogram,
4628    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4629    /// where `Instant` is unavailable.
4630    #[cfg(feature = "metrics")]
4631    fn record_query_metrics(
4632        &self,
4633        language: &str,
4634        elapsed_ms: Option<f64>,
4635        result: &Result<crate::database::QueryResult>,
4636    ) {
4637        use crate::metrics::record_metric;
4638
4639        record_metric!(self.metrics, query_count, inc);
4640        if let Some(ref reg) = self.metrics {
4641            reg.query_count_by_language.increment(language);
4642        }
4643        if let Some(ms) = elapsed_ms {
4644            record_metric!(self.metrics, query_latency, observe ms);
4645        }
4646        match result {
4647            Ok(r) => {
4648                let returned = r.rows.len() as u64;
4649                record_metric!(self.metrics, rows_returned, add returned);
4650                if let Some(scanned) = r.rows_scanned {
4651                    record_metric!(self.metrics, rows_scanned, add scanned);
4652                }
4653            }
4654            Err(e) => {
4655                record_metric!(self.metrics, query_errors, inc);
4656                // Detect timeout errors
4657                let msg = e.to_string();
4658                if msg.contains("exceeded timeout") {
4659                    record_metric!(self.metrics, query_timeouts, inc);
4660                }
4661            }
4662        }
4663    }
4664
4665    /// Evaluates a simple integer literal from a session parameter expression.
4666    #[cfg(feature = "gql")]
4667    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4668        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4669        match expr {
4670            Expression::Literal(Literal::Integer(n)) => Some(*n),
4671            _ => None,
4672        }
4673    }
4674
4675    /// Returns the current transaction context for MVCC visibility.
4676    ///
4677    /// Returns `(viewing_epoch, transaction_id)` where:
4678    /// - `viewing_epoch` is the epoch at which to check version visibility
4679    /// - `transaction_id` is the current transaction ID (if in a transaction)
4680    #[must_use]
4681    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4682        // Time-travel override takes precedence (read-only, no tx context)
4683        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4684            return (epoch, None);
4685        }
4686
4687        if let Some(transaction_id) = *self.current_transaction.lock() {
4688            // In a transaction: use the transaction's start epoch
4689            let epoch = self
4690                .transaction_manager
4691                .start_epoch(transaction_id)
4692                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4693            (epoch, Some(transaction_id))
4694        } else {
4695            // No transaction: use current epoch
4696            (self.transaction_manager.current_epoch(), None)
4697        }
4698    }
4699
4700    /// Creates a planner with transaction context and constraint validator.
4701    ///
4702    /// The `store` parameter is the graph store to plan against (use
4703    /// `self.active_store()` for graph-aware execution).
4704    fn create_planner_for_store(
4705        &self,
4706        store: Arc<dyn GraphStoreSearch>,
4707        viewing_epoch: EpochId,
4708        transaction_id: Option<TransactionId>,
4709    ) -> crate::query::Planner {
4710        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4711    }
4712
4713    fn create_planner_for_store_with_read_only(
4714        &self,
4715        store: Arc<dyn GraphStoreSearch>,
4716        viewing_epoch: EpochId,
4717        transaction_id: Option<TransactionId>,
4718        read_only: bool,
4719    ) -> crate::query::Planner {
4720        use crate::query::Planner;
4721        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4722
4723        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4724        let info_store = Arc::clone(&store);
4725        let schema_store = Arc::clone(&store);
4726
4727        let session_context = SessionContext {
4728            current_schema: self.current_schema(),
4729            current_graph: self.current_graph(),
4730            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4731            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4732        };
4733
4734        let write_store = self.active_write_store();
4735
4736        let mut planner = Planner::with_context(
4737            Arc::clone(&store),
4738            write_store,
4739            Arc::clone(&self.transaction_manager),
4740            transaction_id,
4741            viewing_epoch,
4742        )
4743        .with_factorized_execution(self.factorized_execution)
4744        .with_catalog(Arc::clone(&self.catalog))
4745        .with_session_context(session_context)
4746        .with_read_only(read_only);
4747
4748        // Attach the LPG store so CALL grafeo.search.* procedures can reach
4749        // HNSW / BM25 indexes. Skip when the session is backed by an external
4750        // store — `self.store` is an empty placeholder in that case and would
4751        // make search procedures see a store with no data or indexes.
4752        #[cfg(feature = "lpg")]
4753        if matches!(self.lpg_backend, LpgBackend::Active) {
4754            planner = planner.with_lpg_store(Arc::clone(&self.store));
4755        }
4756
4757        // Attach the constraint validator for schema enforcement and property size limits
4758        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4759            .with_store(store)
4760            .with_max_property_size(self.max_property_size);
4761        planner = planner.with_validator(Arc::new(validator));
4762
4763        planner
4764    }
4765
4766    /// Builds a `Value::Map` for the `info()` introspection function.
4767    fn build_info_value(store: &dyn GraphStore) -> Value {
4768        use grafeo_common::types::PropertyKey;
4769        use std::collections::BTreeMap;
4770
4771        let mut map = BTreeMap::new();
4772        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4773        // reason: node/edge counts will not exceed i64::MAX
4774        #[allow(clippy::cast_possible_wrap)]
4775        let node_count = store.node_count() as i64;
4776        // reason: value is a small counter, well within i64::MAX
4777        #[allow(clippy::cast_possible_wrap)]
4778        let edge_count = store.edge_count() as i64;
4779        map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4780        map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4781        map.insert(
4782            PropertyKey::from("version"),
4783            Value::String(env!("CARGO_PKG_VERSION").into()),
4784        );
4785        Value::Map(map.into())
4786    }
4787
4788    /// Builds a `Value::Map` for the `schema()` introspection function.
4789    fn build_schema_value(store: &dyn GraphStore) -> Value {
4790        use grafeo_common::types::PropertyKey;
4791        use std::collections::BTreeMap;
4792
4793        let labels: Vec<Value> = store
4794            .all_labels()
4795            .into_iter()
4796            .map(|l| Value::String(l.into()))
4797            .collect();
4798        let edge_types: Vec<Value> = store
4799            .all_edge_types()
4800            .into_iter()
4801            .map(|t| Value::String(t.into()))
4802            .collect();
4803        let property_keys: Vec<Value> = store
4804            .all_property_keys()
4805            .into_iter()
4806            .map(|k| Value::String(k.into()))
4807            .collect();
4808
4809        let mut map = BTreeMap::new();
4810        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4811        map.insert(
4812            PropertyKey::from("edge_types"),
4813            Value::List(edge_types.into()),
4814        );
4815        map.insert(
4816            PropertyKey::from("property_keys"),
4817            Value::List(property_keys.into()),
4818        );
4819        Value::Map(map.into())
4820    }
4821
4822    /// Creates a node directly (bypassing query execution).
4823    ///
4824    /// This is a low-level API for testing and direct manipulation.
4825    /// If a transaction is active, the node will be versioned with the transaction ID.
4826    #[cfg(feature = "lpg")]
4827    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4828        let (epoch, transaction_id) = self.get_transaction_context();
4829        self.active_lpg_store().create_node_versioned(
4830            labels,
4831            epoch,
4832            transaction_id.unwrap_or(TransactionId::SYSTEM),
4833        )
4834    }
4835
4836    /// Creates a node with properties.
4837    ///
4838    /// If a transaction is active, the node will be versioned with the transaction ID.
4839    ///
4840    /// # Errors
4841    ///
4842    /// Returns an error if any property value exceeds the configured `max_property_size`.
4843    #[cfg(feature = "lpg")]
4844    pub fn create_node_with_props<'a>(
4845        &self,
4846        labels: &[&str],
4847        properties: impl IntoIterator<Item = (&'a str, Value)>,
4848    ) -> Result<NodeId> {
4849        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4850        for (key, value) in &props {
4851            self.check_property_size(key, value)?;
4852        }
4853        let (epoch, transaction_id) = self.get_transaction_context();
4854        Ok(self.active_lpg_store().create_node_with_props_versioned(
4855            labels,
4856            props,
4857            epoch,
4858            transaction_id.unwrap_or(TransactionId::SYSTEM),
4859        ))
4860    }
4861
4862    /// Creates an edge between two nodes.
4863    ///
4864    /// This is a low-level API for testing and direct manipulation.
4865    /// If a transaction is active, the edge will be versioned with the transaction ID.
4866    #[cfg(feature = "lpg")]
4867    pub fn create_edge(
4868        &self,
4869        src: NodeId,
4870        dst: NodeId,
4871        edge_type: &str,
4872    ) -> grafeo_common::types::EdgeId {
4873        let (epoch, transaction_id) = self.get_transaction_context();
4874        self.active_lpg_store().create_edge_versioned(
4875            src,
4876            dst,
4877            edge_type,
4878            epoch,
4879            transaction_id.unwrap_or(TransactionId::SYSTEM),
4880        )
4881    }
4882
4883    /// Creates an edge with properties within the active transaction context.
4884    ///
4885    /// # Errors
4886    ///
4887    /// Returns an error if any property value exceeds the configured `max_property_size`.
4888    #[cfg(feature = "lpg")]
4889    pub fn create_edge_with_props<'a>(
4890        &self,
4891        src: NodeId,
4892        dst: NodeId,
4893        edge_type: &str,
4894        properties: impl IntoIterator<Item = (&'a str, Value)>,
4895    ) -> Result<grafeo_common::types::EdgeId> {
4896        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4897        for (key, value) in &props {
4898            self.check_property_size(key, value)?;
4899        }
4900        let (epoch, transaction_id) = self.get_transaction_context();
4901        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4902        let store = self.active_lpg_store();
4903        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4904        for (key, value) in props {
4905            store.set_edge_property_versioned(eid, key, value, tid);
4906        }
4907        Ok(eid)
4908    }
4909
4910    /// Sets a node property within the active transaction context.
4911    ///
4912    /// # Errors
4913    ///
4914    /// Returns an error if the value exceeds the configured `max_property_size`.
4915    #[cfg(feature = "lpg")]
4916    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4917        self.check_property_size(key, &value)?;
4918        let (_, transaction_id) = self.get_transaction_context();
4919        if let Some(tid) = transaction_id {
4920            self.active_lpg_store()
4921                .set_node_property_versioned(id, key, value, tid);
4922        } else {
4923            self.active_lpg_store().set_node_property(id, key, value);
4924        }
4925        Ok(())
4926    }
4927
4928    /// Sets an edge property within the active transaction context.
4929    ///
4930    /// # Errors
4931    ///
4932    /// Returns an error if the value exceeds the configured `max_property_size`.
4933    #[cfg(feature = "lpg")]
4934    pub fn set_edge_property(
4935        &self,
4936        id: grafeo_common::types::EdgeId,
4937        key: &str,
4938        value: Value,
4939    ) -> Result<()> {
4940        self.check_property_size(key, &value)?;
4941        let (_, transaction_id) = self.get_transaction_context();
4942        if let Some(tid) = transaction_id {
4943            self.active_lpg_store()
4944                .set_edge_property_versioned(id, key, value, tid);
4945        } else {
4946            self.active_lpg_store().set_edge_property(id, key, value);
4947        }
4948        Ok(())
4949    }
4950
4951    /// Deletes a node within the active transaction context.
4952    #[cfg(feature = "lpg")]
4953    pub fn delete_node(&self, id: NodeId) -> bool {
4954        let (epoch, transaction_id) = self.get_transaction_context();
4955        if let Some(tid) = transaction_id {
4956            self.active_lpg_store()
4957                .delete_node_versioned(id, epoch, tid)
4958        } else {
4959            self.active_lpg_store().delete_node(id)
4960        }
4961    }
4962
4963    /// Deletes an edge within the active transaction context.
4964    #[cfg(feature = "lpg")]
4965    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4966        let (epoch, transaction_id) = self.get_transaction_context();
4967        if let Some(tid) = transaction_id {
4968            self.active_lpg_store()
4969                .delete_edge_versioned(id, epoch, tid)
4970        } else {
4971            self.active_lpg_store().delete_edge(id)
4972        }
4973    }
4974
4975    // =========================================================================
4976    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4977    // =========================================================================
4978
4979    /// Gets a node by ID directly, bypassing query planning.
4980    ///
4981    /// This is the fastest way to retrieve a single node when you know its ID.
4982    /// Skips parsing, binding, optimization, and physical planning entirely.
4983    ///
4984    /// # Performance
4985    ///
4986    /// - Time complexity: O(1) average case
4987    /// - No lock contention (uses DashMap internally)
4988    /// - ~20-30x faster than equivalent MATCH query
4989    ///
4990    /// # Example
4991    ///
4992    /// ```no_run
4993    /// # use grafeo_engine::GrafeoDB;
4994    /// # let db = GrafeoDB::new_in_memory();
4995    /// let session = db.session();
4996    /// let node_id = session.create_node(&["Person"]);
4997    ///
4998    /// // Direct lookup - O(1), no query planning
4999    /// let node = session.get_node(node_id);
5000    /// assert!(node.is_some());
5001    /// ```
5002    #[cfg(feature = "lpg")]
5003    #[must_use]
5004    pub fn get_node(&self, id: NodeId) -> Option<Node> {
5005        let (epoch, transaction_id) = self.get_transaction_context();
5006        self.active_lpg_store().get_node_versioned(
5007            id,
5008            epoch,
5009            transaction_id.unwrap_or(TransactionId::SYSTEM),
5010        )
5011    }
5012
5013    /// Gets a single property from a node by ID, bypassing query planning.
5014    ///
5015    /// More efficient than `get_node()` when you only need one property,
5016    /// as it avoids loading the full node with all properties.
5017    ///
5018    /// # Performance
5019    ///
5020    /// - Time complexity: O(1) average case
5021    /// - No query planning overhead
5022    ///
5023    /// # Example
5024    ///
5025    /// ```no_run
5026    /// # use grafeo_engine::GrafeoDB;
5027    /// # use grafeo_common::types::Value;
5028    /// # let db = GrafeoDB::new_in_memory();
5029    /// let session = db.session();
5030    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]).unwrap();
5031    ///
5032    /// // Direct property access - O(1)
5033    /// let name = session.get_node_property(id, "name");
5034    /// assert_eq!(name, Some(Value::String("Alix".into())));
5035    /// ```
5036    #[cfg(feature = "lpg")]
5037    #[must_use]
5038    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
5039        self.get_node(id)
5040            .and_then(|node| node.get_property(key).cloned())
5041    }
5042
5043    /// Gets an edge by ID directly, bypassing query planning.
5044    ///
5045    /// # Performance
5046    ///
5047    /// - Time complexity: O(1) average case
5048    /// - No lock contention
5049    #[cfg(feature = "lpg")]
5050    #[must_use]
5051    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
5052        let (epoch, transaction_id) = self.get_transaction_context();
5053        self.active_lpg_store().get_edge_versioned(
5054            id,
5055            epoch,
5056            transaction_id.unwrap_or(TransactionId::SYSTEM),
5057        )
5058    }
5059
5060    /// Gets outgoing neighbors of a node directly, bypassing query planning.
5061    ///
5062    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
5063    ///
5064    /// # Performance
5065    ///
5066    /// - Time complexity: O(degree) where degree is the number of outgoing edges
5067    /// - Uses adjacency index for direct access
5068    /// - ~10-20x faster than equivalent MATCH query
5069    ///
5070    /// # Example
5071    ///
5072    /// ```no_run
5073    /// # use grafeo_engine::GrafeoDB;
5074    /// # let db = GrafeoDB::new_in_memory();
5075    /// let session = db.session();
5076    /// let alix = session.create_node(&["Person"]);
5077    /// let gus = session.create_node(&["Person"]);
5078    /// session.create_edge(alix, gus, "KNOWS");
5079    ///
5080    /// // Direct neighbor lookup - O(degree)
5081    /// let neighbors = session.get_neighbors_outgoing(alix);
5082    /// assert_eq!(neighbors.len(), 1);
5083    /// assert_eq!(neighbors[0].0, gus);
5084    /// ```
5085    #[cfg(feature = "lpg")]
5086    #[must_use]
5087    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5088        self.active_lpg_store()
5089            .edges_from(node, Direction::Outgoing)
5090            .collect()
5091    }
5092
5093    /// Gets incoming neighbors of a node directly, bypassing query planning.
5094    ///
5095    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
5096    ///
5097    /// # Performance
5098    ///
5099    /// - Time complexity: O(degree) where degree is the number of incoming edges
5100    /// - Uses backward adjacency index for direct access
5101    #[cfg(feature = "lpg")]
5102    #[must_use]
5103    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5104        self.active_lpg_store()
5105            .edges_from(node, Direction::Incoming)
5106            .collect()
5107    }
5108
5109    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
5110    ///
5111    /// # Example
5112    ///
5113    /// ```no_run
5114    /// # use grafeo_engine::GrafeoDB;
5115    /// # let db = GrafeoDB::new_in_memory();
5116    /// # let session = db.session();
5117    /// # let alix = session.create_node(&["Person"]);
5118    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5119    /// ```
5120    #[cfg(feature = "lpg")]
5121    #[must_use]
5122    pub fn get_neighbors_outgoing_by_type(
5123        &self,
5124        node: NodeId,
5125        edge_type: &str,
5126    ) -> Vec<(NodeId, EdgeId)> {
5127        self.active_lpg_store()
5128            .edges_from(node, Direction::Outgoing)
5129            .filter(|(_, edge_id)| {
5130                self.get_edge(*edge_id)
5131                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
5132            })
5133            .collect()
5134    }
5135
5136    /// Checks if a node exists, bypassing query planning.
5137    ///
5138    /// # Performance
5139    ///
5140    /// - Time complexity: O(1)
5141    /// - Fastest existence check available
5142    #[cfg(feature = "lpg")]
5143    #[must_use]
5144    pub fn node_exists(&self, id: NodeId) -> bool {
5145        self.get_node(id).is_some()
5146    }
5147
5148    /// Checks if an edge exists, bypassing query planning.
5149    #[cfg(feature = "lpg")]
5150    #[must_use]
5151    pub fn edge_exists(&self, id: EdgeId) -> bool {
5152        self.get_edge(id).is_some()
5153    }
5154
5155    /// Gets the degree (number of edges) of a node.
5156    ///
5157    /// Returns (outgoing_degree, incoming_degree).
5158    #[cfg(feature = "lpg")]
5159    #[must_use]
5160    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5161        let active = self.active_lpg_store();
5162        let out = active.out_degree(node);
5163        let in_degree = active.in_degree(node);
5164        (out, in_degree)
5165    }
5166
5167    /// Batch lookup of multiple nodes by ID.
5168    ///
5169    /// More efficient than calling `get_node()` in a loop because it
5170    /// amortizes overhead.
5171    ///
5172    /// # Performance
5173    ///
5174    /// - Time complexity: O(n) where n is the number of IDs
5175    /// - Better cache utilization than individual lookups
5176    #[cfg(feature = "lpg")]
5177    #[must_use]
5178    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5179        let (epoch, transaction_id) = self.get_transaction_context();
5180        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5181        let active = self.active_lpg_store();
5182        ids.iter()
5183            .map(|&id| active.get_node_versioned(id, epoch, tx))
5184            .collect()
5185    }
5186
5187    // ── Change Data Capture ─────────────────────────────────────────────
5188
5189    /// Returns the full change history for an entity (node or edge).
5190    ///
5191    /// # Errors
5192    ///
5193    /// Returns an authorization error if the session lacks read permission.
5194    #[cfg(feature = "cdc")]
5195    pub fn history(
5196        &self,
5197        entity_id: impl Into<crate::cdc::EntityId>,
5198    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5199        self.require_permission(crate::auth::StatementKind::Read)?;
5200        Ok(self.cdc_log.history(entity_id.into()))
5201    }
5202
5203    /// Returns change events for an entity since the given epoch.
5204    ///
5205    /// # Errors
5206    ///
5207    /// Returns an authorization error if the session lacks read permission.
5208    #[cfg(feature = "cdc")]
5209    pub fn history_since(
5210        &self,
5211        entity_id: impl Into<crate::cdc::EntityId>,
5212        since_epoch: EpochId,
5213    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5214        self.require_permission(crate::auth::StatementKind::Read)?;
5215        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5216    }
5217
5218    /// Returns all change events across all entities in an epoch range.
5219    ///
5220    /// # Errors
5221    ///
5222    /// Returns an authorization error if the session lacks read permission.
5223    #[cfg(feature = "cdc")]
5224    pub fn changes_between(
5225        &self,
5226        start_epoch: EpochId,
5227        end_epoch: EpochId,
5228    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5229        self.require_permission(crate::auth::StatementKind::Read)?;
5230        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5231    }
5232}
5233
5234impl Drop for Session {
5235    fn drop(&mut self) {
5236        // Auto-rollback any active transaction to prevent leaked MVCC state,
5237        // dangling write locks, and uncommitted versions lingering in the store.
5238        #[cfg(feature = "lpg")]
5239        if self.in_transaction() {
5240            let _ = self.rollback_inner();
5241        }
5242
5243        #[cfg(feature = "metrics")]
5244        if let Some(ref reg) = self.metrics {
5245            reg.session_active
5246                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5247        }
5248    }
5249}
5250
5251#[cfg(test)]
5252mod tests {
5253    use super::parse_default_literal;
5254    use crate::database::GrafeoDB;
5255    use grafeo_common::types::Value;
5256
5257    // -----------------------------------------------------------------------
5258    // parse_default_literal
5259    // -----------------------------------------------------------------------
5260
5261    #[test]
5262    fn parse_default_literal_null() {
5263        assert_eq!(parse_default_literal("null"), Value::Null);
5264        assert_eq!(parse_default_literal("NULL"), Value::Null);
5265        assert_eq!(parse_default_literal("Null"), Value::Null);
5266    }
5267
5268    #[test]
5269    fn parse_default_literal_bool() {
5270        assert_eq!(parse_default_literal("true"), Value::Bool(true));
5271        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5272        assert_eq!(parse_default_literal("false"), Value::Bool(false));
5273        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5274    }
5275
5276    #[test]
5277    fn parse_default_literal_string_single_quoted() {
5278        assert_eq!(
5279            parse_default_literal("'hello'"),
5280            Value::String("hello".into())
5281        );
5282    }
5283
5284    #[test]
5285    fn parse_default_literal_string_double_quoted() {
5286        assert_eq!(
5287            parse_default_literal("\"world\""),
5288            Value::String("world".into())
5289        );
5290    }
5291
5292    #[test]
5293    fn parse_default_literal_integer() {
5294        assert_eq!(parse_default_literal("42"), Value::Int64(42));
5295        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5296        assert_eq!(parse_default_literal("0"), Value::Int64(0));
5297    }
5298
5299    #[test]
5300    fn parse_default_literal_float() {
5301        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5302        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5303    }
5304
5305    #[test]
5306    fn parse_default_literal_fallback_string() {
5307        // Not a recognized literal, not quoted, not a number
5308        assert_eq!(
5309            parse_default_literal("some_identifier"),
5310            Value::String("some_identifier".into())
5311        );
5312    }
5313
5314    #[test]
5315    fn test_session_create_node() {
5316        let db = GrafeoDB::new_in_memory();
5317        let session = db.session();
5318
5319        let id = session.create_node(&["Person"]);
5320        assert!(id.is_valid());
5321        assert_eq!(db.node_count(), 1);
5322    }
5323
5324    #[test]
5325    fn test_session_transaction() {
5326        let db = GrafeoDB::new_in_memory();
5327        let mut session = db.session();
5328
5329        assert!(!session.in_transaction());
5330
5331        session.begin_transaction().unwrap();
5332        assert!(session.in_transaction());
5333
5334        session.commit().unwrap();
5335        assert!(!session.in_transaction());
5336    }
5337
5338    #[test]
5339    fn test_session_transaction_context() {
5340        let db = GrafeoDB::new_in_memory();
5341        let mut session = db.session();
5342
5343        // Without transaction - context should have current epoch and no transaction_id
5344        let (_epoch1, transaction_id1) = session.get_transaction_context();
5345        assert!(transaction_id1.is_none());
5346
5347        // Start a transaction
5348        session.begin_transaction().unwrap();
5349        let (epoch2, transaction_id2) = session.get_transaction_context();
5350        assert!(transaction_id2.is_some());
5351        // Transaction should have a valid epoch
5352        let _ = epoch2; // Use the variable
5353
5354        // Commit and verify
5355        session.commit().unwrap();
5356        let (epoch3, tx_id3) = session.get_transaction_context();
5357        assert!(tx_id3.is_none());
5358        // Epoch should have advanced after commit
5359        assert!(epoch3.as_u64() >= epoch2.as_u64());
5360    }
5361
5362    #[test]
5363    fn test_session_rollback() {
5364        let db = GrafeoDB::new_in_memory();
5365        let mut session = db.session();
5366
5367        session.begin_transaction().unwrap();
5368        session.rollback().unwrap();
5369        assert!(!session.in_transaction());
5370    }
5371
5372    #[test]
5373    fn test_session_rollback_discards_versions() {
5374        use grafeo_common::types::TransactionId;
5375
5376        let db = GrafeoDB::new_in_memory();
5377
5378        // Create a node outside of any transaction (at system level)
5379        let node_before = db.store().create_node(&["Person"]);
5380        assert!(node_before.is_valid());
5381        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5382
5383        // Start a transaction
5384        let mut session = db.session();
5385        session.begin_transaction().unwrap();
5386        let transaction_id = session.current_transaction.lock().unwrap();
5387
5388        // Create a node versioned with the transaction's ID
5389        let epoch = db.store().current_epoch();
5390        let node_in_tx = db
5391            .store()
5392            .create_node_versioned(&["Person"], epoch, transaction_id);
5393        assert!(node_in_tx.is_valid());
5394
5395        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5396        // non-versioned lookups like node_count(). Verify the node is visible
5397        // only through the owning transaction.
5398        assert_eq!(
5399            db.node_count(),
5400            1,
5401            "PENDING nodes should be invisible to non-versioned node_count()"
5402        );
5403        assert!(
5404            db.store()
5405                .get_node_versioned(node_in_tx, epoch, transaction_id)
5406                .is_some(),
5407            "Transaction node should be visible to its own transaction"
5408        );
5409
5410        // Rollback the transaction
5411        session.rollback().unwrap();
5412        assert!(!session.in_transaction());
5413
5414        // The node created in the transaction should be discarded
5415        // Only the first node should remain visible
5416        let count_after = db.node_count();
5417        assert_eq!(
5418            count_after, 1,
5419            "Rollback should discard uncommitted node, but got {count_after}"
5420        );
5421
5422        // The original node should still be accessible
5423        let current_epoch = db.store().current_epoch();
5424        assert!(
5425            db.store()
5426                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5427                .is_some(),
5428            "Original node should still exist"
5429        );
5430
5431        // The node created in the transaction should not be accessible
5432        assert!(
5433            db.store()
5434                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5435                .is_none(),
5436            "Transaction node should be gone"
5437        );
5438    }
5439
5440    #[test]
5441    fn test_session_create_node_in_transaction() {
5442        // Test that session.create_node() is transaction-aware
5443        let db = GrafeoDB::new_in_memory();
5444
5445        // Create a node outside of any transaction
5446        let node_before = db.create_node(&["Person"]);
5447        assert!(node_before.is_valid());
5448        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5449
5450        // Start a transaction and create a node through the session
5451        let mut session = db.session();
5452        session.begin_transaction().unwrap();
5453        let transaction_id = session.current_transaction.lock().unwrap();
5454
5455        // Create a node through session.create_node() - should be versioned with tx
5456        let node_in_tx = session.create_node(&["Person"]);
5457        assert!(node_in_tx.is_valid());
5458
5459        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5460        // non-versioned lookups. Verify the node is visible only to its own tx.
5461        assert_eq!(
5462            db.node_count(),
5463            1,
5464            "PENDING nodes should be invisible to non-versioned node_count()"
5465        );
5466        let epoch = db.store().current_epoch();
5467        assert!(
5468            db.store()
5469                .get_node_versioned(node_in_tx, epoch, transaction_id)
5470                .is_some(),
5471            "Transaction node should be visible to its own transaction"
5472        );
5473
5474        // Rollback the transaction
5475        session.rollback().unwrap();
5476
5477        // The node created via session.create_node() should be discarded
5478        let count_after = db.node_count();
5479        assert_eq!(
5480            count_after, 1,
5481            "Rollback should discard node created via session.create_node(), but got {count_after}"
5482        );
5483    }
5484
5485    #[test]
5486    fn test_session_create_node_with_props_in_transaction() {
5487        use grafeo_common::types::Value;
5488
5489        // Test that session.create_node_with_props() is transaction-aware
5490        let db = GrafeoDB::new_in_memory();
5491
5492        // Create a node outside of any transaction
5493        db.create_node(&["Person"]);
5494        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5495
5496        // Start a transaction and create a node with properties
5497        let mut session = db.session();
5498        session.begin_transaction().unwrap();
5499        let transaction_id = session.current_transaction.lock().unwrap();
5500
5501        let node_in_tx = session
5502            .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5503            .unwrap();
5504        assert!(node_in_tx.is_valid());
5505
5506        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5507        // non-versioned lookups. Verify the node is visible only to its own tx.
5508        assert_eq!(
5509            db.node_count(),
5510            1,
5511            "PENDING nodes should be invisible to non-versioned node_count()"
5512        );
5513        let epoch = db.store().current_epoch();
5514        assert!(
5515            db.store()
5516                .get_node_versioned(node_in_tx, epoch, transaction_id)
5517                .is_some(),
5518            "Transaction node should be visible to its own transaction"
5519        );
5520
5521        // Rollback the transaction
5522        session.rollback().unwrap();
5523
5524        // The node should be discarded
5525        let count_after = db.node_count();
5526        assert_eq!(
5527            count_after, 1,
5528            "Rollback should discard node created via session.create_node_with_props()"
5529        );
5530    }
5531
5532    #[cfg(feature = "gql")]
5533    mod gql_tests {
5534        use super::*;
5535
5536        #[test]
5537        fn test_gql_query_execution() {
5538            let db = GrafeoDB::new_in_memory();
5539            let session = db.session();
5540
5541            // Create some test data
5542            session.create_node(&["Person"]);
5543            session.create_node(&["Person"]);
5544            session.create_node(&["Animal"]);
5545
5546            // Execute a GQL query
5547            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5548
5549            // Should return 2 Person nodes
5550            assert_eq!(result.row_count(), 2);
5551            assert_eq!(result.column_count(), 1);
5552            assert_eq!(result.columns[0], "n");
5553        }
5554
5555        #[test]
5556        fn test_gql_empty_result() {
5557            let db = GrafeoDB::new_in_memory();
5558            let session = db.session();
5559
5560            // No data in database
5561            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5562
5563            assert_eq!(result.row_count(), 0);
5564        }
5565
5566        #[test]
5567        fn test_gql_parse_error() {
5568            let db = GrafeoDB::new_in_memory();
5569            let session = db.session();
5570
5571            // Invalid GQL syntax
5572            let result = session.execute("MATCH (n RETURN n");
5573
5574            assert!(result.is_err());
5575        }
5576
5577        #[test]
5578        fn test_gql_relationship_traversal() {
5579            let db = GrafeoDB::new_in_memory();
5580            let session = db.session();
5581
5582            // Create a graph: Alix -> Gus, Alix -> Vincent
5583            let alix = session.create_node(&["Person"]);
5584            let gus = session.create_node(&["Person"]);
5585            let vincent = session.create_node(&["Person"]);
5586
5587            session.create_edge(alix, gus, "KNOWS");
5588            session.create_edge(alix, vincent, "KNOWS");
5589
5590            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5591            let result = session
5592                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5593                .unwrap();
5594
5595            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5596            assert_eq!(result.row_count(), 2);
5597            assert_eq!(result.column_count(), 2);
5598            assert_eq!(result.columns[0], "a");
5599            assert_eq!(result.columns[1], "b");
5600        }
5601
5602        #[test]
5603        fn test_gql_relationship_with_type_filter() {
5604            let db = GrafeoDB::new_in_memory();
5605            let session = db.session();
5606
5607            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5608            let alix = session.create_node(&["Person"]);
5609            let gus = session.create_node(&["Person"]);
5610            let vincent = session.create_node(&["Person"]);
5611
5612            session.create_edge(alix, gus, "KNOWS");
5613            session.create_edge(alix, vincent, "WORKS_WITH");
5614
5615            // Query only KNOWS relationships
5616            let result = session
5617                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5618                .unwrap();
5619
5620            // Should return only 1 row (Alix->Gus)
5621            assert_eq!(result.row_count(), 1);
5622        }
5623
5624        #[test]
5625        fn test_gql_semantic_error_undefined_variable() {
5626            let db = GrafeoDB::new_in_memory();
5627            let session = db.session();
5628
5629            // Reference undefined variable 'x' in RETURN
5630            let result = session.execute("MATCH (n:Person) RETURN x");
5631
5632            // Should fail with semantic error
5633            assert!(result.is_err());
5634            let Err(err) = result else {
5635                panic!("Expected error")
5636            };
5637            assert!(
5638                err.to_string().contains("Undefined variable"),
5639                "Expected undefined variable error, got: {}",
5640                err
5641            );
5642        }
5643
5644        #[test]
5645        fn test_gql_where_clause_property_filter() {
5646            use grafeo_common::types::Value;
5647
5648            let db = GrafeoDB::new_in_memory();
5649            let session = db.session();
5650
5651            // Create people with ages
5652            session
5653                .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5654                .unwrap();
5655            session
5656                .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5657                .unwrap();
5658            session
5659                .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5660                .unwrap();
5661
5662            // Query with WHERE clause: age > 30
5663            let result = session
5664                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5665                .unwrap();
5666
5667            // Should return 2 people (ages 35 and 45)
5668            assert_eq!(result.row_count(), 2);
5669        }
5670
5671        #[test]
5672        fn test_gql_where_clause_equality() {
5673            use grafeo_common::types::Value;
5674
5675            let db = GrafeoDB::new_in_memory();
5676            let session = db.session();
5677
5678            // Create people with names
5679            session
5680                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5681                .unwrap();
5682            session
5683                .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5684                .unwrap();
5685            session
5686                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5687                .unwrap();
5688
5689            // Query with WHERE clause: name = "Alix"
5690            let result = session
5691                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5692                .unwrap();
5693
5694            // Should return 2 people named Alix
5695            assert_eq!(result.row_count(), 2);
5696        }
5697
5698        #[test]
5699        fn test_gql_return_property_access() {
5700            use grafeo_common::types::Value;
5701
5702            let db = GrafeoDB::new_in_memory();
5703            let session = db.session();
5704
5705            // Create people with names and ages
5706            session
5707                .create_node_with_props(
5708                    &["Person"],
5709                    [
5710                        ("name", Value::String("Alix".into())),
5711                        ("age", Value::Int64(30)),
5712                    ],
5713                )
5714                .unwrap();
5715            session
5716                .create_node_with_props(
5717                    &["Person"],
5718                    [
5719                        ("name", Value::String("Gus".into())),
5720                        ("age", Value::Int64(25)),
5721                    ],
5722                )
5723                .unwrap();
5724
5725            // Query returning properties
5726            let result = session
5727                .execute("MATCH (n:Person) RETURN n.name, n.age")
5728                .unwrap();
5729
5730            // Should return 2 rows with name and age columns
5731            assert_eq!(result.row_count(), 2);
5732            assert_eq!(result.column_count(), 2);
5733            assert_eq!(result.columns[0], "n.name");
5734            assert_eq!(result.columns[1], "n.age");
5735
5736            // Check that we get actual values
5737            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5738            assert!(names.contains(&&Value::String("Alix".into())));
5739            assert!(names.contains(&&Value::String("Gus".into())));
5740        }
5741
5742        #[test]
5743        fn test_gql_return_mixed_expressions() {
5744            use grafeo_common::types::Value;
5745
5746            let db = GrafeoDB::new_in_memory();
5747            let session = db.session();
5748
5749            // Create a person
5750            session
5751                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5752                .unwrap();
5753
5754            // Query returning both node and property
5755            let result = session
5756                .execute("MATCH (n:Person) RETURN n, n.name")
5757                .unwrap();
5758
5759            assert_eq!(result.row_count(), 1);
5760            assert_eq!(result.column_count(), 2);
5761            assert_eq!(result.columns[0], "n");
5762            assert_eq!(result.columns[1], "n.name");
5763
5764            // Second column should be the name
5765            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5766        }
5767    }
5768
5769    #[cfg(feature = "cypher")]
5770    mod cypher_tests {
5771        use super::*;
5772
5773        #[test]
5774        fn test_cypher_query_execution() {
5775            let db = GrafeoDB::new_in_memory();
5776            let session = db.session();
5777
5778            // Create some test data
5779            session.create_node(&["Person"]);
5780            session.create_node(&["Person"]);
5781            session.create_node(&["Animal"]);
5782
5783            // Execute a Cypher query
5784            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5785
5786            // Should return 2 Person nodes
5787            assert_eq!(result.row_count(), 2);
5788            assert_eq!(result.column_count(), 1);
5789            assert_eq!(result.columns[0], "n");
5790        }
5791
5792        #[test]
5793        fn test_cypher_empty_result() {
5794            let db = GrafeoDB::new_in_memory();
5795            let session = db.session();
5796
5797            // No data in database
5798            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5799
5800            assert_eq!(result.row_count(), 0);
5801        }
5802
5803        #[test]
5804        fn test_cypher_parse_error() {
5805            let db = GrafeoDB::new_in_memory();
5806            let session = db.session();
5807
5808            // Invalid Cypher syntax
5809            let result = session.execute_cypher("MATCH (n RETURN n");
5810
5811            assert!(result.is_err());
5812        }
5813    }
5814
5815    // ==================== Direct Lookup API Tests ====================
5816
5817    mod direct_lookup_tests {
5818        use super::*;
5819        use grafeo_common::types::Value;
5820
5821        #[test]
5822        fn test_get_node() {
5823            let db = GrafeoDB::new_in_memory();
5824            let session = db.session();
5825
5826            let id = session.create_node(&["Person"]);
5827            let node = session.get_node(id);
5828
5829            assert!(node.is_some());
5830            let node = node.unwrap();
5831            assert_eq!(node.id, id);
5832        }
5833
5834        #[test]
5835        fn test_get_node_not_found() {
5836            use grafeo_common::types::NodeId;
5837
5838            let db = GrafeoDB::new_in_memory();
5839            let session = db.session();
5840
5841            // Try to get a non-existent node
5842            let node = session.get_node(NodeId::new(9999));
5843            assert!(node.is_none());
5844        }
5845
5846        #[test]
5847        fn test_get_node_property() {
5848            let db = GrafeoDB::new_in_memory();
5849            let session = db.session();
5850
5851            let id = session
5852                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5853                .unwrap();
5854
5855            let name = session.get_node_property(id, "name");
5856            assert_eq!(name, Some(Value::String("Alix".into())));
5857
5858            // Non-existent property
5859            let missing = session.get_node_property(id, "missing");
5860            assert!(missing.is_none());
5861        }
5862
5863        #[test]
5864        fn test_get_edge() {
5865            let db = GrafeoDB::new_in_memory();
5866            let session = db.session();
5867
5868            let alix = session.create_node(&["Person"]);
5869            let gus = session.create_node(&["Person"]);
5870            let edge_id = session.create_edge(alix, gus, "KNOWS");
5871
5872            let edge = session.get_edge(edge_id);
5873            assert!(edge.is_some());
5874            let edge = edge.unwrap();
5875            assert_eq!(edge.id, edge_id);
5876            assert_eq!(edge.src, alix);
5877            assert_eq!(edge.dst, gus);
5878        }
5879
5880        #[test]
5881        fn test_get_edge_not_found() {
5882            use grafeo_common::types::EdgeId;
5883
5884            let db = GrafeoDB::new_in_memory();
5885            let session = db.session();
5886
5887            let edge = session.get_edge(EdgeId::new(9999));
5888            assert!(edge.is_none());
5889        }
5890
5891        #[test]
5892        fn test_get_neighbors_outgoing() {
5893            let db = GrafeoDB::new_in_memory();
5894            let session = db.session();
5895
5896            let alix = session.create_node(&["Person"]);
5897            let gus = session.create_node(&["Person"]);
5898            let harm = session.create_node(&["Person"]);
5899
5900            session.create_edge(alix, gus, "KNOWS");
5901            session.create_edge(alix, harm, "KNOWS");
5902
5903            let neighbors = session.get_neighbors_outgoing(alix);
5904            assert_eq!(neighbors.len(), 2);
5905
5906            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5907            assert!(neighbor_ids.contains(&gus));
5908            assert!(neighbor_ids.contains(&harm));
5909        }
5910
5911        #[test]
5912        fn test_get_neighbors_incoming() {
5913            let db = GrafeoDB::new_in_memory();
5914            let session = db.session();
5915
5916            let alix = session.create_node(&["Person"]);
5917            let gus = session.create_node(&["Person"]);
5918            let harm = session.create_node(&["Person"]);
5919
5920            session.create_edge(gus, alix, "KNOWS");
5921            session.create_edge(harm, alix, "KNOWS");
5922
5923            let neighbors = session.get_neighbors_incoming(alix);
5924            assert_eq!(neighbors.len(), 2);
5925
5926            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5927            assert!(neighbor_ids.contains(&gus));
5928            assert!(neighbor_ids.contains(&harm));
5929        }
5930
5931        #[test]
5932        fn test_get_neighbors_outgoing_by_type() {
5933            let db = GrafeoDB::new_in_memory();
5934            let session = db.session();
5935
5936            let alix = session.create_node(&["Person"]);
5937            let gus = session.create_node(&["Person"]);
5938            let company = session.create_node(&["Company"]);
5939
5940            session.create_edge(alix, gus, "KNOWS");
5941            session.create_edge(alix, company, "WORKS_AT");
5942
5943            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5944            assert_eq!(knows_neighbors.len(), 1);
5945            assert_eq!(knows_neighbors[0].0, gus);
5946
5947            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5948            assert_eq!(works_neighbors.len(), 1);
5949            assert_eq!(works_neighbors[0].0, company);
5950
5951            // No edges of this type
5952            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5953            assert!(no_neighbors.is_empty());
5954        }
5955
5956        #[test]
5957        fn test_node_exists() {
5958            use grafeo_common::types::NodeId;
5959
5960            let db = GrafeoDB::new_in_memory();
5961            let session = db.session();
5962
5963            let id = session.create_node(&["Person"]);
5964
5965            assert!(session.node_exists(id));
5966            assert!(!session.node_exists(NodeId::new(9999)));
5967        }
5968
5969        #[test]
5970        fn test_edge_exists() {
5971            use grafeo_common::types::EdgeId;
5972
5973            let db = GrafeoDB::new_in_memory();
5974            let session = db.session();
5975
5976            let alix = session.create_node(&["Person"]);
5977            let gus = session.create_node(&["Person"]);
5978            let edge_id = session.create_edge(alix, gus, "KNOWS");
5979
5980            assert!(session.edge_exists(edge_id));
5981            assert!(!session.edge_exists(EdgeId::new(9999)));
5982        }
5983
5984        #[test]
5985        fn test_get_degree() {
5986            let db = GrafeoDB::new_in_memory();
5987            let session = db.session();
5988
5989            let alix = session.create_node(&["Person"]);
5990            let gus = session.create_node(&["Person"]);
5991            let harm = session.create_node(&["Person"]);
5992
5993            // Alix knows Gus and Harm (2 outgoing)
5994            session.create_edge(alix, gus, "KNOWS");
5995            session.create_edge(alix, harm, "KNOWS");
5996            // Gus knows Alix (1 incoming for Alix)
5997            session.create_edge(gus, alix, "KNOWS");
5998
5999            let (out_degree, in_degree) = session.get_degree(alix);
6000            assert_eq!(out_degree, 2);
6001            assert_eq!(in_degree, 1);
6002
6003            // Node with no edges
6004            let lonely = session.create_node(&["Person"]);
6005            let (out, in_deg) = session.get_degree(lonely);
6006            assert_eq!(out, 0);
6007            assert_eq!(in_deg, 0);
6008        }
6009
6010        #[test]
6011        fn test_get_nodes_batch() {
6012            let db = GrafeoDB::new_in_memory();
6013            let session = db.session();
6014
6015            let alix = session.create_node(&["Person"]);
6016            let gus = session.create_node(&["Person"]);
6017            let harm = session.create_node(&["Person"]);
6018
6019            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
6020            assert_eq!(nodes.len(), 3);
6021            assert!(nodes[0].is_some());
6022            assert!(nodes[1].is_some());
6023            assert!(nodes[2].is_some());
6024
6025            // With non-existent node
6026            use grafeo_common::types::NodeId;
6027            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
6028            assert_eq!(nodes_with_missing.len(), 3);
6029            assert!(nodes_with_missing[0].is_some());
6030            assert!(nodes_with_missing[1].is_none()); // Missing node
6031            assert!(nodes_with_missing[2].is_some());
6032        }
6033
6034        #[test]
6035        fn test_auto_commit_setting() {
6036            let db = GrafeoDB::new_in_memory();
6037            let mut session = db.session();
6038
6039            // Default is auto-commit enabled
6040            assert!(session.auto_commit());
6041
6042            session.set_auto_commit(false);
6043            assert!(!session.auto_commit());
6044
6045            session.set_auto_commit(true);
6046            assert!(session.auto_commit());
6047        }
6048
6049        #[test]
6050        fn test_transaction_double_begin_nests() {
6051            let db = GrafeoDB::new_in_memory();
6052            let mut session = db.session();
6053
6054            session.begin_transaction().unwrap();
6055            // Second begin_transaction creates a nested transaction (auto-savepoint)
6056            let result = session.begin_transaction();
6057            assert!(result.is_ok());
6058            // Commit the inner (releases savepoint)
6059            session.commit().unwrap();
6060            // Commit the outer
6061            session.commit().unwrap();
6062        }
6063
6064        #[test]
6065        fn test_commit_without_transaction_error() {
6066            let db = GrafeoDB::new_in_memory();
6067            let mut session = db.session();
6068
6069            let result = session.commit();
6070            assert!(result.is_err());
6071        }
6072
6073        #[test]
6074        fn test_rollback_without_transaction_error() {
6075            let db = GrafeoDB::new_in_memory();
6076            let mut session = db.session();
6077
6078            let result = session.rollback();
6079            assert!(result.is_err());
6080        }
6081
6082        #[test]
6083        fn test_create_edge_in_transaction() {
6084            let db = GrafeoDB::new_in_memory();
6085            let mut session = db.session();
6086
6087            // Create nodes outside transaction
6088            let alix = session.create_node(&["Person"]);
6089            let gus = session.create_node(&["Person"]);
6090
6091            // Create edge in transaction
6092            session.begin_transaction().unwrap();
6093            let edge_id = session.create_edge(alix, gus, "KNOWS");
6094
6095            // Edge should be visible in the transaction
6096            assert!(session.edge_exists(edge_id));
6097
6098            // Commit
6099            session.commit().unwrap();
6100
6101            // Edge should still be visible
6102            assert!(session.edge_exists(edge_id));
6103        }
6104
6105        #[test]
6106        fn test_neighbors_empty_node() {
6107            let db = GrafeoDB::new_in_memory();
6108            let session = db.session();
6109
6110            let lonely = session.create_node(&["Person"]);
6111
6112            assert!(session.get_neighbors_outgoing(lonely).is_empty());
6113            assert!(session.get_neighbors_incoming(lonely).is_empty());
6114            assert!(
6115                session
6116                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6117                    .is_empty()
6118            );
6119        }
6120    }
6121
6122    #[test]
6123    fn test_auto_gc_triggers_on_commit_interval() {
6124        use crate::config::Config;
6125
6126        let config = Config::in_memory().with_gc_interval(2);
6127        let db = GrafeoDB::with_config(config).unwrap();
6128        let mut session = db.session();
6129
6130        // First commit: counter = 1, no GC (not a multiple of 2)
6131        session.begin_transaction().unwrap();
6132        session.create_node(&["A"]);
6133        session.commit().unwrap();
6134
6135        // Second commit: counter = 2, GC should trigger (multiple of 2)
6136        session.begin_transaction().unwrap();
6137        session.create_node(&["B"]);
6138        session.commit().unwrap();
6139
6140        // Verify the database is still functional after GC
6141        assert_eq!(db.node_count(), 2);
6142    }
6143
6144    #[test]
6145    fn test_query_timeout_config_propagates_to_session() {
6146        use crate::config::Config;
6147        use std::time::Duration;
6148
6149        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6150        let db = GrafeoDB::with_config(config).unwrap();
6151        let session = db.session();
6152
6153        // Verify the session has a query deadline (timeout was set)
6154        assert!(session.query_deadline().is_some());
6155    }
6156
6157    #[test]
6158    fn test_default_query_timeout_returns_deadline() {
6159        let db = GrafeoDB::new_in_memory();
6160        let session = db.session();
6161
6162        // Default config has 30s timeout
6163        assert!(session.query_deadline().is_some());
6164    }
6165
6166    #[test]
6167    fn test_no_query_timeout_returns_no_deadline() {
6168        use crate::config::Config;
6169
6170        let config = Config::in_memory().without_query_timeout();
6171        let db = GrafeoDB::with_config(config).unwrap();
6172        let session = db.session();
6173
6174        assert!(session.query_deadline().is_none());
6175    }
6176
6177    #[test]
6178    fn test_graph_model_accessor() {
6179        use crate::config::GraphModel;
6180
6181        let db = GrafeoDB::new_in_memory();
6182        let session = db.session();
6183
6184        assert_eq!(session.graph_model(), GraphModel::Lpg);
6185    }
6186
6187    #[test]
6188    fn test_reject_oversized_property() {
6189        use crate::config::Config;
6190
6191        let config = Config::in_memory().with_max_property_size(100);
6192        let db = GrafeoDB::with_config(config).unwrap();
6193        let session = db.session();
6194
6195        let node = session.create_node(&["Test"]);
6196
6197        // Small property should succeed
6198        session
6199            .set_node_property(node, "small", Value::from("hello"))
6200            .unwrap();
6201
6202        // Large property should be rejected
6203        let big = "x".repeat(200);
6204        let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6205        assert!(result.is_err());
6206        let err = result.unwrap_err().to_string();
6207        assert!(
6208            err.contains("exceeds maximum size"),
6209            "Expected size error, got: {err}"
6210        );
6211    }
6212
6213    #[test]
6214    fn test_no_property_size_limit() {
6215        use crate::config::Config;
6216
6217        let config = Config::in_memory().without_max_property_size();
6218        let db = GrafeoDB::with_config(config).unwrap();
6219        let session = db.session();
6220
6221        let node = session.create_node(&["Test"]);
6222
6223        // Even large properties should succeed with no limit
6224        let big = "x".repeat(10_000);
6225        session
6226            .set_node_property(node, "big", Value::from(big.as_str()))
6227            .unwrap();
6228    }
6229
6230    #[cfg(feature = "gql")]
6231    #[test]
6232    fn test_external_store_session() {
6233        use grafeo_core::graph::GraphStoreMut;
6234        use std::sync::Arc;
6235
6236        let config = crate::config::Config::in_memory();
6237        let store =
6238            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6239        let db = GrafeoDB::with_store(store, config).unwrap();
6240
6241        let mut session = db.session();
6242
6243        // Use an explicit transaction so that INSERT and MATCH share the same
6244        // transaction context. With PENDING epochs, uncommitted versions are
6245        // only visible to the owning transaction.
6246        session.begin_transaction().unwrap();
6247        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6248
6249        // Verify we can query through it within the same transaction
6250        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6251        assert_eq!(result.row_count(), 1);
6252
6253        session.commit().unwrap();
6254    }
6255
6256    // ==================== Session Command Tests ====================
6257
6258    #[cfg(feature = "gql")]
6259    mod session_command_tests {
6260        use super::*;
6261        use grafeo_common::types::Value;
6262
6263        #[test]
6264        fn test_use_graph_sets_current_graph() {
6265            let db = GrafeoDB::new_in_memory();
6266            let session = db.session();
6267
6268            // Create the graph first, then USE it
6269            session.execute("CREATE GRAPH mydb").unwrap();
6270            session.execute("USE GRAPH mydb").unwrap();
6271
6272            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6273        }
6274
6275        #[test]
6276        fn test_use_graph_nonexistent_errors() {
6277            let db = GrafeoDB::new_in_memory();
6278            let session = db.session();
6279
6280            let result = session.execute("USE GRAPH doesnotexist");
6281            assert!(result.is_err());
6282            let err = result.unwrap_err().to_string();
6283            assert!(
6284                err.contains("does not exist"),
6285                "Expected 'does not exist' error, got: {err}"
6286            );
6287        }
6288
6289        #[test]
6290        fn test_use_graph_default_always_valid() {
6291            let db = GrafeoDB::new_in_memory();
6292            let session = db.session();
6293
6294            // "default" is always valid, even without CREATE GRAPH
6295            session.execute("USE GRAPH default").unwrap();
6296            assert_eq!(session.current_graph(), Some("default".to_string()));
6297        }
6298
6299        #[test]
6300        fn test_session_set_graph() {
6301            let db = GrafeoDB::new_in_memory();
6302            let session = db.session();
6303
6304            session.execute("CREATE GRAPH analytics").unwrap();
6305            session.execute("SESSION SET GRAPH analytics").unwrap();
6306            assert_eq!(session.current_graph(), Some("analytics".to_string()));
6307        }
6308
6309        #[test]
6310        fn test_session_set_graph_nonexistent_errors() {
6311            let db = GrafeoDB::new_in_memory();
6312            let session = db.session();
6313
6314            let result = session.execute("SESSION SET GRAPH nosuchgraph");
6315            assert!(result.is_err());
6316        }
6317
6318        #[test]
6319        fn test_session_set_time_zone() {
6320            let db = GrafeoDB::new_in_memory();
6321            let session = db.session();
6322
6323            assert_eq!(session.time_zone(), None);
6324
6325            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6326            assert_eq!(session.time_zone(), Some("UTC".to_string()));
6327
6328            session
6329                .execute("SESSION SET TIME ZONE 'America/New_York'")
6330                .unwrap();
6331            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6332        }
6333
6334        #[test]
6335        fn test_session_set_parameter() {
6336            let db = GrafeoDB::new_in_memory();
6337            let session = db.session();
6338
6339            session
6340                .execute("SESSION SET PARAMETER $timeout = 30")
6341                .unwrap();
6342
6343            // Parameter is stored (value is Null for now, since expression
6344            // evaluation is not yet wired up)
6345            assert!(session.get_parameter("timeout").is_some());
6346        }
6347
6348        #[test]
6349        fn test_session_reset_clears_all_state() {
6350            let db = GrafeoDB::new_in_memory();
6351            let session = db.session();
6352
6353            // Set various session state
6354            session.execute("CREATE GRAPH analytics").unwrap();
6355            session.execute("SESSION SET GRAPH analytics").unwrap();
6356            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6357            session
6358                .execute("SESSION SET PARAMETER $limit = 100")
6359                .unwrap();
6360
6361            // Verify state was set
6362            assert!(session.current_graph().is_some());
6363            assert!(session.time_zone().is_some());
6364            assert!(session.get_parameter("limit").is_some());
6365
6366            // Reset everything
6367            session.execute("SESSION RESET").unwrap();
6368
6369            assert_eq!(session.current_graph(), None);
6370            assert_eq!(session.time_zone(), None);
6371            assert!(session.get_parameter("limit").is_none());
6372        }
6373
6374        #[test]
6375        fn test_session_close_clears_state() {
6376            let db = GrafeoDB::new_in_memory();
6377            let session = db.session();
6378
6379            session.execute("CREATE GRAPH analytics").unwrap();
6380            session.execute("SESSION SET GRAPH analytics").unwrap();
6381            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6382
6383            session.execute("SESSION CLOSE").unwrap();
6384
6385            assert_eq!(session.current_graph(), None);
6386            assert_eq!(session.time_zone(), None);
6387        }
6388
6389        #[test]
6390        fn test_create_graph() {
6391            let db = GrafeoDB::new_in_memory();
6392            let session = db.session();
6393
6394            session.execute("CREATE GRAPH mydb").unwrap();
6395
6396            // Should be able to USE it now
6397            session.execute("USE GRAPH mydb").unwrap();
6398            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6399        }
6400
6401        #[test]
6402        fn test_create_graph_duplicate_errors() {
6403            let db = GrafeoDB::new_in_memory();
6404            let session = db.session();
6405
6406            session.execute("CREATE GRAPH mydb").unwrap();
6407            let result = session.execute("CREATE GRAPH mydb");
6408
6409            assert!(result.is_err());
6410            let err = result.unwrap_err().to_string();
6411            assert!(
6412                err.contains("already exists"),
6413                "Expected 'already exists' error, got: {err}"
6414            );
6415        }
6416
6417        #[test]
6418        fn test_create_graph_if_not_exists() {
6419            let db = GrafeoDB::new_in_memory();
6420            let session = db.session();
6421
6422            session.execute("CREATE GRAPH mydb").unwrap();
6423            // Should succeed silently with IF NOT EXISTS
6424            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6425        }
6426
6427        #[test]
6428        fn test_drop_graph() {
6429            let db = GrafeoDB::new_in_memory();
6430            let session = db.session();
6431
6432            session.execute("CREATE GRAPH mydb").unwrap();
6433            session.execute("DROP GRAPH mydb").unwrap();
6434
6435            // Should no longer be usable
6436            let result = session.execute("USE GRAPH mydb");
6437            assert!(result.is_err());
6438        }
6439
6440        #[test]
6441        fn test_drop_graph_nonexistent_errors() {
6442            let db = GrafeoDB::new_in_memory();
6443            let session = db.session();
6444
6445            let result = session.execute("DROP GRAPH nosuchgraph");
6446            assert!(result.is_err());
6447            let err = result.unwrap_err().to_string();
6448            assert!(
6449                err.contains("does not exist"),
6450                "Expected 'does not exist' error, got: {err}"
6451            );
6452        }
6453
6454        #[test]
6455        fn test_drop_graph_if_exists() {
6456            let db = GrafeoDB::new_in_memory();
6457            let session = db.session();
6458
6459            // Should succeed silently with IF EXISTS
6460            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6461        }
6462
6463        #[test]
6464        fn test_start_transaction_via_gql() {
6465            let db = GrafeoDB::new_in_memory();
6466            let session = db.session();
6467
6468            session.execute("START TRANSACTION").unwrap();
6469            assert!(session.in_transaction());
6470            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6471            session.execute("COMMIT").unwrap();
6472            assert!(!session.in_transaction());
6473
6474            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6475            assert_eq!(result.rows.len(), 1);
6476        }
6477
6478        #[test]
6479        fn test_start_transaction_read_only_blocks_insert() {
6480            let db = GrafeoDB::new_in_memory();
6481            let session = db.session();
6482
6483            session.execute("START TRANSACTION READ ONLY").unwrap();
6484            let result = session.execute("INSERT (:Person {name: 'Alix'})");
6485            assert!(result.is_err());
6486            let err = result.unwrap_err().to_string();
6487            assert!(
6488                err.contains("read-only"),
6489                "Expected read-only error, got: {err}"
6490            );
6491            session.execute("ROLLBACK").unwrap();
6492        }
6493
6494        #[test]
6495        fn test_start_transaction_read_only_allows_reads() {
6496            let db = GrafeoDB::new_in_memory();
6497            let mut session = db.session();
6498            session.begin_transaction().unwrap();
6499            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6500            session.commit().unwrap();
6501
6502            session.execute("START TRANSACTION READ ONLY").unwrap();
6503            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6504            assert_eq!(result.rows.len(), 1);
6505            session.execute("COMMIT").unwrap();
6506        }
6507
6508        #[test]
6509        fn test_rollback_via_gql() {
6510            let db = GrafeoDB::new_in_memory();
6511            let session = db.session();
6512
6513            session.execute("START TRANSACTION").unwrap();
6514            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6515            session.execute("ROLLBACK").unwrap();
6516
6517            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6518            assert!(result.rows.is_empty());
6519        }
6520
6521        #[test]
6522        fn test_start_transaction_with_isolation_level() {
6523            let db = GrafeoDB::new_in_memory();
6524            let session = db.session();
6525
6526            session
6527                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6528                .unwrap();
6529            assert!(session.in_transaction());
6530            session.execute("ROLLBACK").unwrap();
6531        }
6532
6533        #[test]
6534        fn test_session_commands_return_empty_result() {
6535            let db = GrafeoDB::new_in_memory();
6536            let session = db.session();
6537
6538            session.execute("CREATE GRAPH test").unwrap();
6539            let result = session.execute("SESSION SET GRAPH test").unwrap();
6540            assert_eq!(result.row_count(), 0);
6541            assert_eq!(result.column_count(), 0);
6542        }
6543
6544        #[test]
6545        fn test_current_graph_default_is_none() {
6546            let db = GrafeoDB::new_in_memory();
6547            let session = db.session();
6548
6549            assert_eq!(session.current_graph(), None);
6550        }
6551
6552        #[test]
6553        fn test_time_zone_default_is_none() {
6554            let db = GrafeoDB::new_in_memory();
6555            let session = db.session();
6556
6557            assert_eq!(session.time_zone(), None);
6558        }
6559
6560        #[test]
6561        fn test_session_state_independent_across_sessions() {
6562            let db = GrafeoDB::new_in_memory();
6563            let session1 = db.session();
6564            let session2 = db.session();
6565
6566            session1.execute("CREATE GRAPH first").unwrap();
6567            session1.execute("CREATE GRAPH second").unwrap();
6568            session1.execute("SESSION SET GRAPH first").unwrap();
6569            session2.execute("SESSION SET GRAPH second").unwrap();
6570
6571            assert_eq!(session1.current_graph(), Some("first".to_string()));
6572            assert_eq!(session2.current_graph(), Some("second".to_string()));
6573        }
6574
6575        #[test]
6576        fn test_show_node_types() {
6577            let db = GrafeoDB::new_in_memory();
6578            let session = db.session();
6579
6580            session
6581                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6582                .unwrap();
6583
6584            let result = session.execute("SHOW NODE TYPES").unwrap();
6585            assert_eq!(
6586                result.columns,
6587                vec!["name", "properties", "constraints", "parents"]
6588            );
6589            assert_eq!(result.rows.len(), 1);
6590            // First column is the type name
6591            assert_eq!(result.rows[0][0], Value::from("Person"));
6592        }
6593
6594        #[test]
6595        fn test_show_edge_types() {
6596            let db = GrafeoDB::new_in_memory();
6597            let session = db.session();
6598
6599            session
6600                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6601                .unwrap();
6602
6603            let result = session.execute("SHOW EDGE TYPES").unwrap();
6604            assert_eq!(
6605                result.columns,
6606                vec!["name", "properties", "source_types", "target_types"]
6607            );
6608            assert_eq!(result.rows.len(), 1);
6609            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6610        }
6611
6612        #[test]
6613        fn test_show_graph_types() {
6614            let db = GrafeoDB::new_in_memory();
6615            let session = db.session();
6616
6617            session
6618                .execute("CREATE NODE TYPE Person (name STRING)")
6619                .unwrap();
6620            session
6621                .execute(
6622                    "CREATE GRAPH TYPE social (\
6623                        NODE TYPE Person (name STRING)\
6624                    )",
6625                )
6626                .unwrap();
6627
6628            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6629            assert_eq!(
6630                result.columns,
6631                vec!["name", "open", "node_types", "edge_types"]
6632            );
6633            assert_eq!(result.rows.len(), 1);
6634            assert_eq!(result.rows[0][0], Value::from("social"));
6635        }
6636
6637        #[test]
6638        fn test_show_graph_type_named() {
6639            let db = GrafeoDB::new_in_memory();
6640            let session = db.session();
6641
6642            session
6643                .execute("CREATE NODE TYPE Person (name STRING)")
6644                .unwrap();
6645            session
6646                .execute(
6647                    "CREATE GRAPH TYPE social (\
6648                        NODE TYPE Person (name STRING)\
6649                    )",
6650                )
6651                .unwrap();
6652
6653            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6654            assert_eq!(result.rows.len(), 1);
6655            assert_eq!(result.rows[0][0], Value::from("social"));
6656        }
6657
6658        #[test]
6659        fn test_show_graph_type_not_found() {
6660            let db = GrafeoDB::new_in_memory();
6661            let session = db.session();
6662
6663            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6664            assert!(result.is_err());
6665        }
6666
6667        #[test]
6668        fn test_show_indexes_via_gql() {
6669            let db = GrafeoDB::new_in_memory();
6670            let session = db.session();
6671
6672            let result = session.execute("SHOW INDEXES").unwrap();
6673            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6674        }
6675
6676        #[test]
6677        fn test_show_constraints_via_gql() {
6678            let db = GrafeoDB::new_in_memory();
6679            let session = db.session();
6680
6681            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6682            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6683        }
6684
6685        #[test]
6686        fn test_pattern_form_graph_type_roundtrip() {
6687            let db = GrafeoDB::new_in_memory();
6688            let session = db.session();
6689
6690            // Register the types first
6691            session
6692                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6693                .unwrap();
6694            session
6695                .execute("CREATE NODE TYPE City (name STRING)")
6696                .unwrap();
6697            session
6698                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6699                .unwrap();
6700            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6701
6702            // Create graph type using pattern form
6703            session
6704                .execute(
6705                    "CREATE GRAPH TYPE social (\
6706                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6707                        (:Person)-[:LIVES_IN]->(:City)\
6708                    )",
6709                )
6710                .unwrap();
6711
6712            // Verify it was created
6713            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6714            assert_eq!(result.rows.len(), 1);
6715            assert_eq!(result.rows[0][0], Value::from("social"));
6716        }
6717    }
6718}