Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40/// Storage key suffix for the implicit default graph within a schema.
41/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
42const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44/// Parses a DDL default-value literal string into a [`Value`].
45///
46/// Handles string literals (single- or double-quoted), integers, floats,
47/// booleans (`true`/`false`), and `NULL`.
48fn parse_default_literal(text: &str) -> Value {
49    if text.eq_ignore_ascii_case("null") {
50        return Value::Null;
51    }
52    if text.eq_ignore_ascii_case("true") {
53        return Value::Bool(true);
54    }
55    if text.eq_ignore_ascii_case("false") {
56        return Value::Bool(false);
57    }
58    // String literal: strip surrounding quotes
59    if (text.starts_with('\'') && text.ends_with('\''))
60        || (text.starts_with('"') && text.ends_with('"'))
61    {
62        return Value::String(text[1..text.len() - 1].into());
63    }
64    // Try integer, then float
65    if let Ok(i) = text.parse::<i64>() {
66        return Value::Int64(i);
67    }
68    if let Ok(f) = text.parse::<f64>() {
69        return Value::Float64(f);
70    }
71    // Fallback: treat as string
72    Value::String(text.into())
73}
74
75/// Runtime configuration for creating a new session.
76///
77/// Groups the shared parameters passed to all session constructors, keeping
78/// call sites readable and avoiding long argument lists.
79pub(crate) struct SessionConfig {
80    pub transaction_manager: Arc<TransactionManager>,
81    pub query_cache: Arc<QueryCache>,
82    pub catalog: Arc<Catalog>,
83    pub adaptive_config: AdaptiveConfig,
84    pub factorized_execution: bool,
85    pub graph_model: GraphModel,
86    pub query_timeout: Option<Duration>,
87    pub max_property_size: Option<usize>,
88    /// Buffer manager for memory-aware query execution.
89    pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90    pub commit_counter: Arc<AtomicUsize>,
91    pub gc_interval: usize,
92    /// When true, the session permanently blocks all mutations.
93    pub read_only: bool,
94    /// The identity bound to this session (for permission checks).
95    pub identity: crate::auth::Identity,
96    /// Named graph projections shared with the database.
97    #[cfg(feature = "lpg")]
98    pub projections: Arc<
99        parking_lot::RwLock<
100            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101        >,
102    >,
103}
104
105/// Your handle to the database - execute queries and manage transactions.
106///
107/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
108/// tracks its own transaction state, so you can have multiple concurrent
109/// sessions without them interfering.
110pub struct Session {
111    /// The underlying store.
112    #[cfg(feature = "lpg")]
113    store: Arc<LpgStore>,
114    /// Graph store trait object for pluggable storage backends (read path).
115    graph_store: Arc<dyn GraphStoreSearch>,
116    /// Writable graph store (None for read-only databases).
117    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
118    /// Schema and metadata catalog shared across sessions.
119    catalog: Arc<Catalog>,
120    /// RDF triple store (if RDF feature is enabled).
121    #[cfg(feature = "triple-store")]
122    rdf_store: Arc<RdfStore>,
123    /// Transaction manager.
124    transaction_manager: Arc<TransactionManager>,
125    /// Query cache shared across sessions.
126    query_cache: Arc<QueryCache>,
127    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
128    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
129    /// from within `execute(&self)`.
130    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
131    /// Whether the current transaction is read-only (blocks mutations).
132    read_only_tx: parking_lot::Mutex<bool>,
133    /// Whether the database itself is read-only (set at open time, never changes).
134    /// When true, `read_only_tx` is always true regardless of transaction flags.
135    db_read_only: bool,
136    /// The identity bound to this session (determines permission level).
137    identity: crate::auth::Identity,
138    /// Whether the session is in auto-commit mode.
139    auto_commit: bool,
140    /// Adaptive execution configuration.
141    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
142    adaptive_config: AdaptiveConfig,
143    /// Whether to use factorized execution for multi-hop queries.
144    factorized_execution: bool,
145    /// The graph data model this session operates on.
146    graph_model: GraphModel,
147    /// Maximum time a query may run before being cancelled.
148    query_timeout: Option<Duration>,
149    /// Maximum size in bytes for a single property value.
150    max_property_size: Option<usize>,
151    /// Buffer manager for memory-aware execution (spill decisions).
152    buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
153    /// Shared commit counter for triggering auto-GC.
154    commit_counter: Arc<AtomicUsize>,
155    /// GC every N commits (0 = disabled).
156    gc_interval: usize,
157    /// Node count at the start of the current transaction (for PreparedCommit stats).
158    transaction_start_node_count: AtomicUsize,
159    /// Edge count at the start of the current transaction (for PreparedCommit stats).
160    transaction_start_edge_count: AtomicUsize,
161    /// WAL for logging schema changes.
162    #[cfg(feature = "wal")]
163    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
164    /// Shared WAL graph context tracker for named graph awareness.
165    #[cfg(feature = "wal")]
166    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
167    /// CDC log for change tracking.
168    #[cfg(feature = "cdc")]
169    cdc_log: Arc<crate::cdc::CdcLog>,
170    /// Buffered CDC events for the current transaction.
171    /// Flushed to `cdc_log` on commit, discarded on rollback.
172    #[cfg(feature = "cdc")]
173    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
174    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
175    current_graph: parking_lot::Mutex<Option<String>>,
176    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
177    /// None = "not set" (uses default schema).
178    current_schema: parking_lot::Mutex<Option<String>>,
179    /// Session time zone override.
180    time_zone: parking_lot::Mutex<Option<String>>,
181    /// Session-level parameters (SET PARAMETER).
182    session_params:
183        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
184    /// Override epoch for time-travel queries (None = use transaction/current epoch).
185    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
186    /// Savepoints within the current transaction.
187    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
188    /// Nesting depth for nested transactions (0 = outermost).
189    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
190    /// releases it, nested `ROLLBACK` rolls back to it.
191    transaction_nesting_depth: parking_lot::Mutex<u32>,
192    /// Named graphs touched during the current transaction (for cross-graph atomicity).
193    /// `None` represents the default graph. Populated at `BEGIN` time and on each
194    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
195    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
196    /// Count of active `ResultStream`s pinned to this session. Commit and
197    /// rollback block while any streams are outstanding so mid-iteration
198    /// snapshots are not invalidated.
199    active_streams: AtomicUsize,
200    /// Shared metrics registry (populated when the `metrics` feature is enabled).
201    #[cfg(feature = "metrics")]
202    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
203    /// Transaction start time for duration tracking.
204    #[cfg(feature = "metrics")]
205    tx_start_time: parking_lot::Mutex<Option<Instant>>,
206    /// Named graph projections shared with the database.
207    #[cfg(feature = "lpg")]
208    projections: Arc<
209        parking_lot::RwLock<
210            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
211        >,
212    >,
213}
214
215/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
216#[derive(Clone)]
217struct GraphSavepoint {
218    graph_name: Option<String>,
219    next_node_id: u64,
220    next_edge_id: u64,
221    undo_log_position: usize,
222}
223
224/// Savepoint state: name + per-graph snapshots + the graph that was active.
225#[derive(Clone)]
226struct SavepointState {
227    name: String,
228    graph_snapshots: Vec<GraphSavepoint>,
229    /// The graph that was active when the savepoint was created.
230    /// Reserved for future use (e.g., restoring graph context on rollback).
231    #[allow(dead_code)]
232    active_graph: Option<String>,
233    /// CDC event buffer position at savepoint creation.
234    /// On rollback-to-savepoint, the buffer is truncated to this position.
235    #[cfg(feature = "cdc")]
236    cdc_event_position: usize,
237}
238
239impl Session {
240    /// Creates a new session with adaptive execution configuration.
241    #[cfg(feature = "lpg")]
242    #[allow(dead_code)] // Used when lpg enabled without triple-store
243    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
244        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
245        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
246        Self {
247            store,
248            graph_store,
249            graph_store_mut,
250            catalog: cfg.catalog,
251            #[cfg(feature = "triple-store")]
252            rdf_store: Arc::new(RdfStore::new()),
253            transaction_manager: cfg.transaction_manager,
254            query_cache: cfg.query_cache,
255            current_transaction: parking_lot::Mutex::new(None),
256            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
257            db_read_only: cfg.read_only,
258            identity: cfg.identity,
259            auto_commit: true,
260            adaptive_config: cfg.adaptive_config,
261            factorized_execution: cfg.factorized_execution,
262            graph_model: cfg.graph_model,
263            query_timeout: cfg.query_timeout,
264            max_property_size: cfg.max_property_size,
265            buffer_manager: cfg.buffer_manager,
266            commit_counter: cfg.commit_counter,
267            gc_interval: cfg.gc_interval,
268            transaction_start_node_count: AtomicUsize::new(0),
269            transaction_start_edge_count: AtomicUsize::new(0),
270            #[cfg(feature = "wal")]
271            wal: None,
272            #[cfg(feature = "wal")]
273            wal_graph_context: None,
274            #[cfg(feature = "cdc")]
275            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
276            #[cfg(feature = "cdc")]
277            cdc_pending_events: None,
278            current_graph: parking_lot::Mutex::new(None),
279            current_schema: parking_lot::Mutex::new(None),
280            time_zone: parking_lot::Mutex::new(None),
281            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
282            viewing_epoch_override: parking_lot::Mutex::new(None),
283            savepoints: parking_lot::Mutex::new(Vec::new()),
284            transaction_nesting_depth: parking_lot::Mutex::new(0),
285            touched_graphs: parking_lot::Mutex::new(Vec::new()),
286            active_streams: AtomicUsize::new(0),
287            #[cfg(feature = "metrics")]
288            metrics: None,
289            #[cfg(feature = "metrics")]
290            tx_start_time: parking_lot::Mutex::new(None),
291            projections: cfg.projections,
292        }
293    }
294
295    /// Overrides the graph store and write store used by the query engine.
296    ///
297    /// Used by the layered store integration: the session's `store` field is
298    /// the overlay `LpgStore` (for MVCC), but reads and writes should route
299    /// through the `LayeredStore` (which merges base + overlay).
300    #[cfg(all(feature = "compact-store", feature = "lpg"))]
301    pub(crate) fn override_stores(
302        &mut self,
303        read_store: Arc<dyn GraphStoreSearch>,
304        write_store: Option<Arc<dyn GraphStoreMut>>,
305    ) {
306        self.graph_store = read_store;
307        self.graph_store_mut = write_store;
308    }
309
310    /// Sets the WAL for this session (shared with the database).
311    ///
312    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
313    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
314    #[cfg(all(feature = "wal", feature = "lpg"))]
315    pub(crate) fn set_wal(
316        &mut self,
317        wal: Arc<grafeo_storage::wal::LpgWal>,
318        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
319    ) {
320        // Wrap the graph store so query-engine mutations are WAL-logged
321        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
322            Arc::clone(&self.store),
323            Arc::clone(&wal),
324            Arc::clone(&wal_graph_context),
325        ));
326        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
327        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
328        self.wal = Some(wal);
329        self.wal_graph_context = Some(wal_graph_context);
330    }
331
332    /// Sets the CDC log for this session (shared with the database).
333    ///
334    /// Wraps the current write store with a `CdcGraphStore` decorator so
335    /// that all session mutations (INSERT, SET, DELETE via query execution)
336    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
337    /// and discarded on rollback.
338    #[cfg(feature = "cdc")]
339    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
340        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
341        // The read store (self.graph_store) is left unchanged for zero read overhead.
342        if let Some(ref write_store) = self.graph_store_mut {
343            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
344                Arc::clone(write_store),
345                Arc::clone(&cdc_log),
346            ));
347            self.cdc_pending_events = Some(cdc_store.pending_events());
348            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
349        }
350        self.cdc_log = cdc_log;
351    }
352
353    /// Sets the metrics registry for this session (shared with the database).
354    #[cfg(feature = "metrics")]
355    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
356        self.metrics = Some(metrics);
357    }
358
359    /// Creates a session backed by an external graph store.
360    ///
361    /// The external store handles all data operations. Transaction management
362    /// (begin/commit/rollback) is not supported for external stores.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if the internal arena allocation fails (out of memory).
367    pub(crate) fn with_external_store(
368        read_store: Arc<dyn GraphStoreSearch>,
369        write_store: Option<Arc<dyn GraphStoreMut>>,
370        cfg: SessionConfig,
371    ) -> Result<Self> {
372        Ok(Self {
373            #[cfg(feature = "lpg")]
374            store: Arc::new(LpgStore::new()?),
375            graph_store: read_store,
376            graph_store_mut: write_store,
377            catalog: cfg.catalog,
378            #[cfg(feature = "triple-store")]
379            rdf_store: Arc::new(RdfStore::new()),
380            transaction_manager: cfg.transaction_manager,
381            query_cache: cfg.query_cache,
382            current_transaction: parking_lot::Mutex::new(None),
383            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
384            db_read_only: cfg.read_only,
385            identity: cfg.identity,
386            auto_commit: true,
387            adaptive_config: cfg.adaptive_config,
388            factorized_execution: cfg.factorized_execution,
389            graph_model: cfg.graph_model,
390            query_timeout: cfg.query_timeout,
391            max_property_size: cfg.max_property_size,
392            buffer_manager: cfg.buffer_manager,
393            commit_counter: cfg.commit_counter,
394            gc_interval: cfg.gc_interval,
395            transaction_start_node_count: AtomicUsize::new(0),
396            transaction_start_edge_count: AtomicUsize::new(0),
397            #[cfg(feature = "wal")]
398            wal: None,
399            #[cfg(feature = "wal")]
400            wal_graph_context: None,
401            #[cfg(feature = "cdc")]
402            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
403            #[cfg(feature = "cdc")]
404            cdc_pending_events: None,
405            current_graph: parking_lot::Mutex::new(None),
406            current_schema: parking_lot::Mutex::new(None),
407            time_zone: parking_lot::Mutex::new(None),
408            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
409            viewing_epoch_override: parking_lot::Mutex::new(None),
410            savepoints: parking_lot::Mutex::new(Vec::new()),
411            transaction_nesting_depth: parking_lot::Mutex::new(0),
412            touched_graphs: parking_lot::Mutex::new(Vec::new()),
413            active_streams: AtomicUsize::new(0),
414            #[cfg(feature = "metrics")]
415            metrics: None,
416            #[cfg(feature = "metrics")]
417            tx_start_time: parking_lot::Mutex::new(None),
418            #[cfg(feature = "lpg")]
419            projections: cfg.projections,
420        })
421    }
422
423    /// Returns the graph model this session operates on.
424    #[must_use]
425    pub fn graph_model(&self) -> GraphModel {
426        self.graph_model
427    }
428
429    /// Returns the identity bound to this session.
430    #[must_use]
431    pub fn identity(&self) -> &crate::auth::Identity {
432        &self.identity
433    }
434
435    // === Session State Management ===
436
437    /// Sets the current graph for this session (USE GRAPH).
438    pub fn use_graph(&self, name: &str) {
439        *self.current_graph.lock() = Some(name.to_string());
440        self.track_graph_touch();
441    }
442
443    /// Returns the current graph name, if any.
444    #[must_use]
445    pub fn current_graph(&self) -> Option<String> {
446        self.current_graph.lock().clone()
447    }
448
449    /// Sets the current schema for this session (SESSION SET SCHEMA).
450    ///
451    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
452    pub fn set_schema(&self, name: &str) {
453        *self.current_schema.lock() = Some(name.to_string());
454        self.track_graph_touch();
455    }
456
457    /// Returns the current schema name, if any.
458    ///
459    /// `None` means "not set", which resolves to the default schema.
460    #[must_use]
461    pub fn current_schema(&self) -> Option<String> {
462        self.current_schema.lock().clone()
463    }
464
465    /// Computes the effective storage key for a graph, accounting for schema context.
466    ///
467    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
468    /// Uses `/` as separator since it is invalid in GQL identifiers.
469    fn effective_graph_key(&self, graph_name: &str) -> String {
470        let schema = self.current_schema.lock().clone();
471        match schema {
472            Some(s) => format!("{s}/{graph_name}"),
473            None => graph_name.to_string(),
474        }
475    }
476
477    /// Computes the effective storage key for a type, accounting for schema context.
478    ///
479    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
480    fn effective_type_key(&self, type_name: &str) -> String {
481        let schema = self.current_schema.lock().clone();
482        match schema {
483            Some(s) => format!("{s}/{type_name}"),
484            None => type_name.to_string(),
485        }
486    }
487
488    /// Returns the effective storage key for the current graph, accounting for schema.
489    ///
490    /// Combines `current_schema` and `current_graph` into a flat lookup key.
491    fn active_graph_storage_key(&self) -> Option<String> {
492        let graph = self.current_graph.lock().clone();
493        let schema = self.current_schema.lock().clone();
494        match (&schema, &graph) {
495            (None, None) => None,
496            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
497            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
498            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
499                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
500            }
501            (None, Some(name)) => Some(name.clone()),
502            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
503        }
504    }
505
506    /// Returns the graph store for the currently active graph.
507    ///
508    /// If `current_graph` is `None` or `"default"`, returns the session's
509    /// default `graph_store` (already WAL-wrapped for the default graph).
510    /// Otherwise looks up the named graph in the root store and wraps it
511    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
512    /// graph context.
513    fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
514        let key = self.active_graph_storage_key();
515        match key {
516            None => Arc::clone(&self.graph_store),
517            #[cfg(feature = "lpg")]
518            Some(ref name) => match self.store.graph(name) {
519                Some(named_store) => {
520                    #[cfg(feature = "wal")]
521                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
522                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
523                            named_store,
524                            Arc::clone(wal),
525                            name.clone(),
526                            Arc::clone(ctx),
527                        )) as Arc<dyn GraphStoreSearch>;
528                    }
529                    named_store as Arc<dyn GraphStoreSearch>
530                }
531                None => Arc::clone(&self.graph_store),
532            },
533            #[cfg(not(feature = "lpg"))]
534            Some(_) => Arc::clone(&self.graph_store),
535        }
536    }
537
538    /// Returns the writable store for the active graph, if available.
539    ///
540    /// Returns `None` for read-only databases. For named graphs, wraps
541    /// the store with WAL logging when durability is enabled.
542    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
543        let key = self.active_graph_storage_key();
544        match key {
545            None => self.graph_store_mut.as_ref().map(Arc::clone),
546            #[cfg(feature = "lpg")]
547            Some(ref name) => match self.store.graph(name) {
548                Some(named_store) => {
549                    let mut store: Arc<dyn GraphStoreMut> = named_store;
550
551                    #[cfg(feature = "wal")]
552                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
553                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
554                            // WAL needs Arc<LpgStore>, get it fresh
555                            self.store
556                                .graph(name)
557                                .unwrap_or_else(|| Arc::clone(&self.store)),
558                            Arc::clone(wal),
559                            name.clone(),
560                            Arc::clone(ctx),
561                        ));
562                    }
563
564                    #[cfg(feature = "cdc")]
565                    if let Some(ref pending) = self.cdc_pending_events {
566                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
567                            store,
568                            Arc::clone(&self.cdc_log),
569                            Arc::clone(pending),
570                        ));
571                    }
572
573                    Some(store)
574                }
575                None => self.graph_store_mut.as_ref().map(Arc::clone),
576            },
577            #[cfg(not(feature = "lpg"))]
578            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
579        }
580    }
581
582    /// Returns the concrete `LpgStore` for the currently active graph.
583    ///
584    /// Used by direct CRUD methods that need the concrete store type
585    /// for versioned operations.
586    #[cfg(feature = "lpg")]
587    fn active_lpg_store(&self) -> Arc<LpgStore> {
588        let key = self.active_graph_storage_key();
589        match key {
590            None => Arc::clone(&self.store),
591            Some(ref name) => self
592                .store
593                .graph(name)
594                .unwrap_or_else(|| Arc::clone(&self.store)),
595        }
596    }
597
598    /// Resolves a graph name to a concrete `LpgStore`.
599    /// `None` and `"default"` resolve to the session's root store.
600    #[cfg(feature = "lpg")]
601    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
602        match graph_name {
603            None => Arc::clone(&self.store),
604            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
605            Some(name) => self
606                .store
607                .graph(name)
608                .unwrap_or_else(|| Arc::clone(&self.store)),
609        }
610    }
611
612    /// Records the current graph as "touched" if a transaction is active.
613    ///
614    /// Uses the full storage key (schema/graph) so that commit/rollback
615    /// can resolve the correct store via `resolve_store`. Called from
616    /// every setter that can change the active key (`use_graph`,
617    /// `set_schema`, `reset_*`) so mid-transaction context switches are
618    /// always captured; callers that mutate the active key via those
619    /// setters do not need to invoke this directly.
620    fn track_graph_touch(&self) {
621        if self.current_transaction.lock().is_some() {
622            let key = self.active_graph_storage_key();
623            let mut touched = self.touched_graphs.lock();
624            if !touched.contains(&key) {
625                touched.push(key);
626            }
627        }
628    }
629
630    /// Sets the session time zone.
631    pub fn set_time_zone(&self, tz: &str) {
632        *self.time_zone.lock() = Some(tz.to_string());
633    }
634
635    /// Returns the session time zone, if set.
636    #[must_use]
637    pub fn time_zone(&self) -> Option<String> {
638        self.time_zone.lock().clone()
639    }
640
641    /// Sets a session parameter.
642    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
643        self.session_params.lock().insert(key.to_string(), value);
644    }
645
646    /// Gets a session parameter by cloning it.
647    #[must_use]
648    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
649        self.session_params.lock().get(key).cloned()
650    }
651
652    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
653    pub fn reset_session(&self) {
654        *self.current_schema.lock() = None;
655        *self.current_graph.lock() = None;
656        *self.time_zone.lock() = None;
657        self.session_params.lock().clear();
658        *self.viewing_epoch_override.lock() = None;
659        self.track_graph_touch();
660    }
661
662    /// Resets only the session schema (Section 7.2 GR1).
663    pub fn reset_schema(&self) {
664        *self.current_schema.lock() = None;
665        self.track_graph_touch();
666    }
667
668    /// Resets only the session graph (Section 7.2 GR2).
669    pub fn reset_graph(&self) {
670        *self.current_graph.lock() = None;
671        self.track_graph_touch();
672    }
673
674    /// Resets only the session time zone (Section 7.2 GR3).
675    pub fn reset_time_zone(&self) {
676        *self.time_zone.lock() = None;
677    }
678
679    /// Resets only session parameters (Section 7.2 GR4).
680    pub fn reset_parameters(&self) {
681        self.session_params.lock().clear();
682    }
683
684    // --- Time-travel API ---
685
686    /// Sets a viewing epoch override for time-travel queries.
687    ///
688    /// While set, all queries on this session see the database as it existed
689    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
690    /// to return to normal behavior.
691    pub fn set_viewing_epoch(&self, epoch: EpochId) {
692        *self.viewing_epoch_override.lock() = Some(epoch);
693    }
694
695    /// Clears the viewing epoch override, returning to normal behavior.
696    pub fn clear_viewing_epoch(&self) {
697        *self.viewing_epoch_override.lock() = None;
698    }
699
700    /// Returns the current viewing epoch override, if any.
701    #[must_use]
702    pub fn viewing_epoch(&self) -> Option<EpochId> {
703        *self.viewing_epoch_override.lock()
704    }
705
706    /// Returns all versions of a node with their creation/deletion epochs.
707    ///
708    /// Properties and labels reflect the current state (not versioned per-epoch).
709    #[cfg(feature = "lpg")]
710    #[must_use]
711    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
712        self.active_lpg_store().get_node_history(id)
713    }
714
715    /// Returns all versions of an edge with their creation/deletion epochs.
716    ///
717    /// Properties reflect the current state (not versioned per-epoch).
718    #[cfg(feature = "lpg")]
719    #[must_use]
720    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
721        self.active_lpg_store().get_edge_history(id)
722    }
723
724    /// Checks that the session's graph model supports LPG operations.
725    fn require_lpg(&self, language: &str) -> Result<()> {
726        if self.graph_model == GraphModel::Rdf {
727            return Err(grafeo_common::utils::error::Error::Internal(format!(
728                "This is an RDF database. {language} queries require an LPG database."
729            )));
730        }
731        Ok(())
732    }
733
734    /// Checks that the session's identity is permitted to execute the given
735    /// statement kind. Returns an error if the role is insufficient.
736    ///
737    /// Short-circuits for Admin identities (the common case for embedded use
738    /// and benchmarks) to avoid any overhead on the hot path.
739    #[inline]
740    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
741        // Fast path: Admin can do everything
742        if self.identity.can_admin() {
743            return Ok(());
744        }
745        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
746            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
747                grafeo_common::utils::error::QueryErrorKind::Semantic,
748                denied.to_string(),
749            ))
750        })
751    }
752
753    /// Executes a session or transaction command, returning an empty result.
754    #[cfg(feature = "gql")]
755    fn execute_session_command(
756        &self,
757        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
758    ) -> Result<QueryResult> {
759        use grafeo_adapters::query::gql::ast::SessionCommand;
760        #[cfg(feature = "lpg")]
761        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
762        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
763
764        // Check role-based permission for graph management commands
765        match &cmd {
766            SessionCommand::CreateGraph { .. }
767            | SessionCommand::DropGraph { .. }
768            | SessionCommand::CreateProjection { .. }
769            | SessionCommand::DropProjection { .. } => {
770                self.require_permission(crate::auth::StatementKind::Write)?;
771            }
772            _ => {} // Session state + transaction control: always allowed
773        }
774
775        // Check per-graph grants for graph-scoped commands
776        if self.identity.has_grants() {
777            match &cmd {
778                SessionCommand::CreateGraph { name, .. }
779                | SessionCommand::DropGraph { name, .. }
780                    if !self
781                        .identity
782                        .can_access_graph(name, crate::auth::Role::ReadWrite) =>
783                {
784                    return Err(Error::Query(QueryError::new(
785                        QueryErrorKind::Semantic,
786                        format!(
787                            "permission denied: no grant for graph '{name}' (user: {})",
788                            self.identity.user_id()
789                        ),
790                    )));
791                }
792                _ => {}
793            }
794        }
795
796        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
797        if *self.read_only_tx.lock() {
798            match &cmd {
799                SessionCommand::CreateGraph { .. }
800                | SessionCommand::DropGraph { .. }
801                | SessionCommand::CreateProjection { .. }
802                | SessionCommand::DropProjection { .. } => {
803                    return Err(Error::Transaction(
804                        grafeo_common::utils::error::TransactionError::ReadOnly,
805                    ));
806                }
807                _ => {} // Session state + transaction control allowed
808            }
809        }
810
811        match cmd {
812            #[cfg(feature = "lpg")]
813            SessionCommand::CreateGraph {
814                name,
815                if_not_exists,
816                typed,
817                like_graph,
818                copy_of,
819                open: _,
820            } => {
821                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
822                if name.contains('/') {
823                    return Err(Error::Query(QueryError::new(
824                        QueryErrorKind::Semantic,
825                        format!(
826                            "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
827                        ),
828                    )));
829                }
830                let storage_key = self.effective_graph_key(&name);
831
832                // Validate source graph exists for LIKE / AS COPY OF
833                if let Some(ref src) = like_graph {
834                    let src_key = self.effective_graph_key(src);
835                    if self.store.graph(&src_key).is_none() {
836                        return Err(Error::Query(QueryError::new(
837                            QueryErrorKind::Semantic,
838                            format!("Source graph '{src}' does not exist"),
839                        )));
840                    }
841                }
842                if let Some(ref src) = copy_of {
843                    let src_key = self.effective_graph_key(src);
844                    if self.store.graph(&src_key).is_none() {
845                        return Err(Error::Query(QueryError::new(
846                            QueryErrorKind::Semantic,
847                            format!("Source graph '{src}' does not exist"),
848                        )));
849                    }
850                }
851
852                let created = self
853                    .store
854                    .create_graph(&storage_key)
855                    .map_err(|e| Error::Internal(e.to_string()))?;
856                if !created && !if_not_exists {
857                    return Err(Error::Query(QueryError::new(
858                        QueryErrorKind::Semantic,
859                        format!("Graph '{name}' already exists"),
860                    )));
861                }
862                if created {
863                    #[cfg(feature = "wal")]
864                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
865                        name: storage_key.clone(),
866                    });
867                }
868
869                // AS COPY OF: copy data from source graph
870                if let Some(ref src) = copy_of {
871                    let src_key = self.effective_graph_key(src);
872                    self.store
873                        .copy_graph(Some(&src_key), Some(&storage_key))
874                        .map_err(|e| Error::Internal(e.to_string()))?;
875                }
876
877                // Bind to graph type if specified.
878                // If the parser produced a '/' in the name it is already a qualified
879                // "schema/type" key; otherwise resolve against the current schema.
880                if let Some(type_name) = typed
881                    && let Err(e) = self.catalog.bind_graph_type(
882                        &storage_key,
883                        if type_name.contains('/') {
884                            type_name.clone()
885                        } else {
886                            self.effective_type_key(&type_name)
887                        },
888                    )
889                {
890                    return Err(Error::Query(QueryError::new(
891                        QueryErrorKind::Semantic,
892                        e.to_string(),
893                    )));
894                }
895
896                // LIKE: copy graph type binding from source
897                if let Some(ref src) = like_graph {
898                    let src_key = self.effective_graph_key(src);
899                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
900                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
901                    }
902                }
903
904                Ok(QueryResult::empty())
905            }
906            #[cfg(feature = "lpg")]
907            SessionCommand::DropGraph { name, if_exists } => {
908                let storage_key = self.effective_graph_key(&name);
909                let dropped = self.store.drop_graph(&storage_key);
910                if !dropped && !if_exists {
911                    return Err(Error::Query(QueryError::new(
912                        QueryErrorKind::Semantic,
913                        format!("Graph '{name}' does not exist"),
914                    )));
915                }
916                if dropped {
917                    #[cfg(feature = "wal")]
918                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
919                        name: storage_key.clone(),
920                    });
921                    // If this session was using the dropped graph, reset to default
922                    let mut current = self.current_graph.lock();
923                    if current
924                        .as_deref()
925                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
926                    {
927                        *current = None;
928                    }
929                }
930                Ok(QueryResult::empty())
931            }
932            #[cfg(feature = "lpg")]
933            SessionCommand::UseGraph(name) => {
934                // Check per-graph grant before switching
935                if self.identity.has_grants()
936                    && !name.eq_ignore_ascii_case("default")
937                    && !self
938                        .identity
939                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
940                {
941                    return Err(Error::Query(QueryError::new(
942                        QueryErrorKind::Semantic,
943                        format!(
944                            "permission denied: no grant for graph '{name}' (user: {})",
945                            self.identity.user_id()
946                        ),
947                    )));
948                }
949                // Verify graph exists (resolve within current schema)
950                let effective_key = self.effective_graph_key(&name);
951                if !name.eq_ignore_ascii_case("default")
952                    && self.store.graph(&effective_key).is_none()
953                {
954                    return Err(Error::Query(QueryError::new(
955                        QueryErrorKind::Semantic,
956                        format!("Graph '{name}' does not exist"),
957                    )));
958                }
959                self.use_graph(&name);
960                Ok(QueryResult::empty())
961            }
962            #[cfg(feature = "lpg")]
963            SessionCommand::SessionSetGraph(name) => {
964                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
965                // Check per-graph grant before switching (same as USE GRAPH)
966                if self.identity.has_grants()
967                    && !name.eq_ignore_ascii_case("default")
968                    && !self
969                        .identity
970                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
971                {
972                    return Err(Error::Query(QueryError::new(
973                        QueryErrorKind::Semantic,
974                        format!(
975                            "permission denied: no grant for graph '{name}' (user: {})",
976                            self.identity.user_id()
977                        ),
978                    )));
979                }
980                let effective_key = self.effective_graph_key(&name);
981                if !name.eq_ignore_ascii_case("default")
982                    && self.store.graph(&effective_key).is_none()
983                {
984                    return Err(Error::Query(QueryError::new(
985                        QueryErrorKind::Semantic,
986                        format!("Graph '{name}' does not exist"),
987                    )));
988                }
989                self.use_graph(&name);
990                Ok(QueryResult::empty())
991            }
992            SessionCommand::SessionSetSchema(name) => {
993                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
994                if !self.catalog.schema_exists(&name) {
995                    return Err(Error::Query(QueryError::new(
996                        QueryErrorKind::Semantic,
997                        format!("Schema '{name}' does not exist"),
998                    )));
999                }
1000                self.set_schema(&name);
1001                Ok(QueryResult::empty())
1002            }
1003            SessionCommand::SessionSetTimeZone(tz) => {
1004                self.set_time_zone(&tz);
1005                Ok(QueryResult::empty())
1006            }
1007            #[cfg(feature = "gql")]
1008            SessionCommand::SessionSetParameter(key, expr) => {
1009                if key.eq_ignore_ascii_case("viewing_epoch") {
1010                    match Self::eval_integer_literal(&expr) {
1011                        Some(n) if n >= 0 => {
1012                            // reason: guard ensures n >= 0
1013                            #[allow(clippy::cast_sign_loss)]
1014                            let epoch = n as u64;
1015                            self.set_viewing_epoch(EpochId::new(epoch));
1016                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1017                        }
1018                        _ => Err(Error::Query(QueryError::new(
1019                            QueryErrorKind::Semantic,
1020                            "viewing_epoch must be a non-negative integer literal",
1021                        ))),
1022                    }
1023                } else {
1024                    // For now, store parameter name with Null value.
1025                    // Full expression evaluation would require building and executing a plan.
1026                    self.set_parameter(&key, Value::Null);
1027                    Ok(QueryResult::empty())
1028                }
1029            }
1030            SessionCommand::SessionReset(target) => {
1031                use grafeo_adapters::query::gql::ast::SessionResetTarget;
1032                match target {
1033                    SessionResetTarget::All => self.reset_session(),
1034                    SessionResetTarget::Schema => self.reset_schema(),
1035                    SessionResetTarget::Graph => self.reset_graph(),
1036                    SessionResetTarget::TimeZone => self.reset_time_zone(),
1037                    SessionResetTarget::Parameters => self.reset_parameters(),
1038                }
1039                Ok(QueryResult::empty())
1040            }
1041            SessionCommand::SessionClose => {
1042                self.reset_session();
1043                Ok(QueryResult::empty())
1044            }
1045            #[cfg(feature = "lpg")]
1046            SessionCommand::StartTransaction {
1047                read_only,
1048                isolation_level,
1049            } => {
1050                let engine_level = isolation_level.map(|l| match l {
1051                    TransactionIsolationLevel::ReadCommitted => {
1052                        crate::transaction::IsolationLevel::ReadCommitted
1053                    }
1054                    TransactionIsolationLevel::SnapshotIsolation => {
1055                        crate::transaction::IsolationLevel::SnapshotIsolation
1056                    }
1057                    TransactionIsolationLevel::Serializable => {
1058                        crate::transaction::IsolationLevel::Serializable
1059                    }
1060                });
1061                self.begin_transaction_inner(read_only, engine_level)?;
1062                Ok(QueryResult::status("Transaction started"))
1063            }
1064            #[cfg(feature = "lpg")]
1065            SessionCommand::Commit => {
1066                self.commit_inner()?;
1067                Ok(QueryResult::status("Transaction committed"))
1068            }
1069            #[cfg(feature = "lpg")]
1070            SessionCommand::Rollback => {
1071                self.rollback_inner()?;
1072                Ok(QueryResult::status("Transaction rolled back"))
1073            }
1074            #[cfg(feature = "lpg")]
1075            SessionCommand::Savepoint(name) => {
1076                self.savepoint(&name)?;
1077                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1078            }
1079            #[cfg(feature = "lpg")]
1080            SessionCommand::RollbackToSavepoint(name) => {
1081                self.rollback_to_savepoint(&name)?;
1082                Ok(QueryResult::status(format!(
1083                    "Rolled back to savepoint '{name}'"
1084                )))
1085            }
1086            #[cfg(feature = "lpg")]
1087            SessionCommand::ReleaseSavepoint(name) => {
1088                self.release_savepoint(&name)?;
1089                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1090            }
1091            #[cfg(feature = "lpg")]
1092            SessionCommand::CreateProjection {
1093                name,
1094                node_labels,
1095                edge_types,
1096            } => {
1097                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1098                use std::collections::hash_map::Entry;
1099
1100                let spec = ProjectionSpec::new()
1101                    .with_node_labels(node_labels)
1102                    .with_edge_types(edge_types);
1103
1104                let store = self.active_store();
1105                let projection = Arc::new(GraphProjection::new(store, spec));
1106                let mut projections = self.projections.write();
1107                match projections.entry(name.clone()) {
1108                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1109                        QueryErrorKind::Semantic,
1110                        format!("Projection '{name}' already exists"),
1111                    ))),
1112                    Entry::Vacant(e) => {
1113                        e.insert(projection);
1114                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1115                    }
1116                }
1117            }
1118            #[cfg(feature = "lpg")]
1119            SessionCommand::DropProjection { name } => {
1120                let removed = self.projections.write().remove(&name).is_some();
1121                if !removed {
1122                    return Err(Error::Query(QueryError::new(
1123                        QueryErrorKind::Semantic,
1124                        format!("Projection '{name}' does not exist"),
1125                    )));
1126                }
1127                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1128            }
1129            #[cfg(feature = "lpg")]
1130            SessionCommand::ShowProjections => {
1131                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1132                names.sort();
1133                let rows: Vec<Vec<Value>> =
1134                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1135                Ok(QueryResult {
1136                    columns: vec!["name".to_string()],
1137                    column_types: Vec::new(),
1138                    rows,
1139                    ..QueryResult::empty()
1140                })
1141            }
1142            #[cfg(not(feature = "lpg"))]
1143            _ => Err(grafeo_common::utils::error::Error::Internal(
1144                "This command requires the `lpg` feature".to_string(),
1145            )),
1146        }
1147    }
1148
1149    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1150    #[cfg(feature = "wal")]
1151    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1152        if let Some(ref wal) = self.wal
1153            && let Err(e) = wal.log(record)
1154        {
1155            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1156        }
1157    }
1158
1159    /// Executes a schema DDL command, returning a status result.
1160    #[cfg(all(feature = "lpg", feature = "gql"))]
1161    fn execute_schema_command(
1162        &self,
1163        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1164    ) -> Result<QueryResult> {
1165        use crate::catalog::{
1166            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1167        };
1168        use grafeo_adapters::query::gql::ast::SchemaStatement;
1169        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1170        #[cfg(feature = "wal")]
1171        use grafeo_storage::wal::WalRecord;
1172
1173        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1174        macro_rules! wal_log {
1175            ($self:expr, $record:expr) => {
1176                #[cfg(feature = "wal")]
1177                $self.log_schema_wal(&$record);
1178            };
1179        }
1180
1181        let result = match cmd {
1182            SchemaStatement::CreateNodeType(stmt) => {
1183                let effective_name = self.effective_type_key(&stmt.name);
1184                #[cfg(feature = "wal")]
1185                let props_for_wal: Vec<(String, String, bool)> = stmt
1186                    .properties
1187                    .iter()
1188                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1189                    .collect();
1190                let def = NodeTypeDefinition {
1191                    name: effective_name.clone(),
1192                    properties: stmt
1193                        .properties
1194                        .iter()
1195                        .map(|p| TypedProperty {
1196                            name: p.name.clone(),
1197                            data_type: PropertyDataType::from_type_name(&p.data_type),
1198                            nullable: p.nullable,
1199                            default_value: p
1200                                .default_value
1201                                .as_ref()
1202                                .map(|s| parse_default_literal(s)),
1203                        })
1204                        .collect(),
1205                    constraints: Vec::new(),
1206                    parent_types: stmt.parent_types.clone(),
1207                };
1208                let result = if stmt.or_replace {
1209                    let _ = self.catalog.drop_node_type(&effective_name);
1210                    self.catalog.register_node_type(def)
1211                } else {
1212                    self.catalog.register_node_type(def)
1213                };
1214                match result {
1215                    Ok(()) => {
1216                        wal_log!(
1217                            self,
1218                            WalRecord::CreateNodeType {
1219                                name: effective_name.clone(),
1220                                properties: props_for_wal,
1221                                constraints: Vec::new(),
1222                            }
1223                        );
1224                        Ok(QueryResult::status(format!(
1225                            "Created node type '{}'",
1226                            stmt.name
1227                        )))
1228                    }
1229                    Err(e) if stmt.if_not_exists => {
1230                        let _ = e;
1231                        Ok(QueryResult::status("No change"))
1232                    }
1233                    Err(e) => Err(Error::Query(QueryError::new(
1234                        QueryErrorKind::Semantic,
1235                        e.to_string(),
1236                    ))),
1237                }
1238            }
1239            SchemaStatement::CreateEdgeType(stmt) => {
1240                let effective_name = self.effective_type_key(&stmt.name);
1241                #[cfg(feature = "wal")]
1242                let props_for_wal: Vec<(String, String, bool)> = stmt
1243                    .properties
1244                    .iter()
1245                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1246                    .collect();
1247                let def = EdgeTypeDefinition {
1248                    name: effective_name.clone(),
1249                    properties: stmt
1250                        .properties
1251                        .iter()
1252                        .map(|p| TypedProperty {
1253                            name: p.name.clone(),
1254                            data_type: PropertyDataType::from_type_name(&p.data_type),
1255                            nullable: p.nullable,
1256                            default_value: p
1257                                .default_value
1258                                .as_ref()
1259                                .map(|s| parse_default_literal(s)),
1260                        })
1261                        .collect(),
1262                    constraints: Vec::new(),
1263                    source_node_types: stmt.source_node_types.clone(),
1264                    target_node_types: stmt.target_node_types.clone(),
1265                };
1266                let result = if stmt.or_replace {
1267                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1268                    self.catalog.register_edge_type_def(def)
1269                } else {
1270                    self.catalog.register_edge_type_def(def)
1271                };
1272                match result {
1273                    Ok(()) => {
1274                        wal_log!(
1275                            self,
1276                            WalRecord::CreateEdgeType {
1277                                name: effective_name.clone(),
1278                                properties: props_for_wal,
1279                                constraints: Vec::new(),
1280                            }
1281                        );
1282                        Ok(QueryResult::status(format!(
1283                            "Created edge type '{}'",
1284                            stmt.name
1285                        )))
1286                    }
1287                    Err(e) if stmt.if_not_exists => {
1288                        let _ = e;
1289                        Ok(QueryResult::status("No change"))
1290                    }
1291                    Err(e) => Err(Error::Query(QueryError::new(
1292                        QueryErrorKind::Semantic,
1293                        e.to_string(),
1294                    ))),
1295                }
1296            }
1297            SchemaStatement::CreateVectorIndex(stmt) => {
1298                Self::create_vector_index_on_store(
1299                    &self.active_lpg_store(),
1300                    &stmt.node_label,
1301                    &stmt.property,
1302                    stmt.dimensions,
1303                    stmt.metric.as_deref(),
1304                )?;
1305                wal_log!(
1306                    self,
1307                    WalRecord::CreateIndex {
1308                        name: stmt.name.clone(),
1309                        label: stmt.node_label.clone(),
1310                        property: stmt.property.clone(),
1311                        index_type: "vector".to_string(),
1312                    }
1313                );
1314                Ok(QueryResult::status(format!(
1315                    "Created vector index '{}'",
1316                    stmt.name
1317                )))
1318            }
1319            SchemaStatement::DropNodeType { name, if_exists } => {
1320                let effective_name = self.effective_type_key(&name);
1321                match self.catalog.drop_node_type(&effective_name) {
1322                    Ok(()) => {
1323                        wal_log!(
1324                            self,
1325                            WalRecord::DropNodeType {
1326                                name: effective_name
1327                            }
1328                        );
1329                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1330                    }
1331                    Err(e) if if_exists => {
1332                        let _ = e;
1333                        Ok(QueryResult::status("No change"))
1334                    }
1335                    Err(e) => Err(Error::Query(QueryError::new(
1336                        QueryErrorKind::Semantic,
1337                        e.to_string(),
1338                    ))),
1339                }
1340            }
1341            SchemaStatement::DropEdgeType { name, if_exists } => {
1342                let effective_name = self.effective_type_key(&name);
1343                match self.catalog.drop_edge_type_def(&effective_name) {
1344                    Ok(()) => {
1345                        wal_log!(
1346                            self,
1347                            WalRecord::DropEdgeType {
1348                                name: effective_name
1349                            }
1350                        );
1351                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1352                    }
1353                    Err(e) if if_exists => {
1354                        let _ = e;
1355                        Ok(QueryResult::status("No change"))
1356                    }
1357                    Err(e) => Err(Error::Query(QueryError::new(
1358                        QueryErrorKind::Semantic,
1359                        e.to_string(),
1360                    ))),
1361                }
1362            }
1363            SchemaStatement::CreateIndex(stmt) => {
1364                use crate::catalog::IndexType as CatalogIndexType;
1365                use grafeo_adapters::query::gql::ast::IndexKind;
1366                let active = self.active_lpg_store();
1367                let index_type_str = match stmt.index_kind {
1368                    IndexKind::Property => "property",
1369                    IndexKind::BTree => "btree",
1370                    IndexKind::Text => "text",
1371                    IndexKind::Vector => "vector",
1372                };
1373                match stmt.index_kind {
1374                    IndexKind::Property | IndexKind::BTree => {
1375                        for prop in &stmt.properties {
1376                            active.create_property_index(prop);
1377                        }
1378                    }
1379                    IndexKind::Text => {
1380                        for prop in &stmt.properties {
1381                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1382                        }
1383                    }
1384                    IndexKind::Vector => {
1385                        for prop in &stmt.properties {
1386                            Self::create_vector_index_on_store(
1387                                &active,
1388                                &stmt.label,
1389                                prop,
1390                                stmt.options.dimensions,
1391                                stmt.options.metric.as_deref(),
1392                            )?;
1393                        }
1394                    }
1395                }
1396                // Register each property index in the catalog for SHOW INDEXES
1397                // and DROP INDEX name-based lookup.
1398                let catalog_index_type = match stmt.index_kind {
1399                    IndexKind::Property => CatalogIndexType::Hash,
1400                    IndexKind::BTree => CatalogIndexType::BTree,
1401                    IndexKind::Text => CatalogIndexType::FullText,
1402                    IndexKind::Vector => CatalogIndexType::Hash,
1403                };
1404                let label_id = self.catalog.get_or_create_label(&stmt.label);
1405                for prop in &stmt.properties {
1406                    let prop_id = self.catalog.get_or_create_property_key(prop);
1407                    self.catalog
1408                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1409                }
1410                #[cfg(feature = "wal")]
1411                for prop in &stmt.properties {
1412                    wal_log!(
1413                        self,
1414                        WalRecord::CreateIndex {
1415                            name: stmt.name.clone(),
1416                            label: stmt.label.clone(),
1417                            property: prop.clone(),
1418                            index_type: index_type_str.to_string(),
1419                        }
1420                    );
1421                }
1422                Ok(QueryResult::status(format!(
1423                    "Created {} index '{}'",
1424                    index_type_str, stmt.name
1425                )))
1426            }
1427            SchemaStatement::DropIndex { name, if_exists } => {
1428                // Look up the index by name in the catalog to find the
1429                // underlying property, then drop from both catalog and store.
1430                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1431                    let def = self.catalog.get_index(index_id);
1432                    self.catalog.drop_index(index_id);
1433                    if let Some(def) = def
1434                        && let Some(prop_name) =
1435                            self.catalog.get_property_key_name(def.property_key)
1436                    {
1437                        self.active_lpg_store().drop_property_index(&prop_name);
1438                    }
1439                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1440                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1441                } else if if_exists {
1442                    Ok(QueryResult::status("No change".to_string()))
1443                } else {
1444                    Err(Error::Query(QueryError::new(
1445                        QueryErrorKind::Semantic,
1446                        format!("Index '{name}' does not exist"),
1447                    )))
1448                }
1449            }
1450            SchemaStatement::CreateConstraint(stmt) => {
1451                use crate::catalog::TypeConstraint;
1452                use grafeo_adapters::query::gql::ast::ConstraintKind;
1453                let kind_str = match stmt.constraint_kind {
1454                    ConstraintKind::Unique => "unique",
1455                    ConstraintKind::NodeKey => "node_key",
1456                    ConstraintKind::NotNull => "not_null",
1457                    ConstraintKind::Exists => "exists",
1458                };
1459                let constraint_name = stmt
1460                    .name
1461                    .clone()
1462                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1463
1464                // Register constraint in catalog type definitions
1465                match stmt.constraint_kind {
1466                    ConstraintKind::Unique => {
1467                        for prop in &stmt.properties {
1468                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1469                            let prop_id = self.catalog.get_or_create_property_key(prop);
1470                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1471                        }
1472                        let _ = self.catalog.add_constraint_to_type(
1473                            &stmt.label,
1474                            TypeConstraint::Unique(stmt.properties.clone()),
1475                        );
1476                    }
1477                    ConstraintKind::NodeKey => {
1478                        for prop in &stmt.properties {
1479                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1480                            let prop_id = self.catalog.get_or_create_property_key(prop);
1481                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1482                            let _ = self.catalog.add_required_property(label_id, prop_id);
1483                        }
1484                        let _ = self.catalog.add_constraint_to_type(
1485                            &stmt.label,
1486                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1487                        );
1488                    }
1489                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1490                        for prop in &stmt.properties {
1491                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1492                            let prop_id = self.catalog.get_or_create_property_key(prop);
1493                            let _ = self.catalog.add_required_property(label_id, prop_id);
1494                            let _ = self.catalog.add_constraint_to_type(
1495                                &stmt.label,
1496                                TypeConstraint::NotNull(prop.clone()),
1497                            );
1498                        }
1499                    }
1500                }
1501
1502                wal_log!(
1503                    self,
1504                    WalRecord::CreateConstraint {
1505                        name: constraint_name.clone(),
1506                        label: stmt.label.clone(),
1507                        properties: stmt.properties.clone(),
1508                        kind: kind_str.to_string(),
1509                    }
1510                );
1511                Ok(QueryResult::status(format!(
1512                    "Created {kind_str} constraint '{constraint_name}'"
1513                )))
1514            }
1515            SchemaStatement::DropConstraint { name, if_exists } => {
1516                let _ = if_exists;
1517                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1518                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1519            }
1520            SchemaStatement::CreateGraphType(stmt) => {
1521                use crate::catalog::GraphTypeDefinition;
1522                use grafeo_adapters::query::gql::ast::InlineElementType;
1523
1524                let effective_name = self.effective_type_key(&stmt.name);
1525
1526                // GG04: LIKE clause copies type from existing graph
1527                let (mut node_types, mut edge_types, open) =
1528                    if let Some(ref like_graph) = stmt.like_graph {
1529                        // Infer types from the graph's bound type, or use its existing types
1530                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1531                            if let Some(existing) = self
1532                                .catalog
1533                                .schema()
1534                                .and_then(|s| s.get_graph_type(&type_name))
1535                            {
1536                                (
1537                                    existing.allowed_node_types.clone(),
1538                                    existing.allowed_edge_types.clone(),
1539                                    existing.open,
1540                                )
1541                            } else {
1542                                (Vec::new(), Vec::new(), true)
1543                            }
1544                        } else {
1545                            // GG22: Infer from graph data (labels used in graph)
1546                            let nt = self.catalog.all_node_type_names();
1547                            let et = self.catalog.all_edge_type_names();
1548                            if nt.is_empty() && et.is_empty() {
1549                                (Vec::new(), Vec::new(), true)
1550                            } else {
1551                                (nt, et, false)
1552                            }
1553                        }
1554                    } else {
1555                        // Prefix element type names with schema for consistency
1556                        let nt = stmt
1557                            .node_types
1558                            .iter()
1559                            .map(|n| self.effective_type_key(n))
1560                            .collect();
1561                        let et = stmt
1562                            .edge_types
1563                            .iter()
1564                            .map(|n| self.effective_type_key(n))
1565                            .collect();
1566                        (nt, et, stmt.open)
1567                    };
1568
1569                // GG03: Register inline element types and add their names
1570                for inline in &stmt.inline_types {
1571                    match inline {
1572                        InlineElementType::Node {
1573                            name,
1574                            properties,
1575                            key_labels,
1576                            ..
1577                        } => {
1578                            let inline_effective = self.effective_type_key(name);
1579                            let def = NodeTypeDefinition {
1580                                name: inline_effective.clone(),
1581                                properties: properties
1582                                    .iter()
1583                                    .map(|p| TypedProperty {
1584                                        name: p.name.clone(),
1585                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1586                                        nullable: p.nullable,
1587                                        default_value: None,
1588                                    })
1589                                    .collect(),
1590                                constraints: Vec::new(),
1591                                parent_types: key_labels.clone(),
1592                            };
1593                            // Register or replace so inline defs override existing
1594                            self.catalog.register_or_replace_node_type(def);
1595                            #[cfg(feature = "wal")]
1596                            {
1597                                let props_for_wal: Vec<(String, String, bool)> = properties
1598                                    .iter()
1599                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1600                                    .collect();
1601                                self.log_schema_wal(&WalRecord::CreateNodeType {
1602                                    name: inline_effective.clone(),
1603                                    properties: props_for_wal,
1604                                    constraints: Vec::new(),
1605                                });
1606                            }
1607                            if !node_types.contains(&inline_effective) {
1608                                node_types.push(inline_effective);
1609                            }
1610                        }
1611                        InlineElementType::Edge {
1612                            name,
1613                            properties,
1614                            source_node_types,
1615                            target_node_types,
1616                            ..
1617                        } => {
1618                            let inline_effective = self.effective_type_key(name);
1619                            let def = EdgeTypeDefinition {
1620                                name: inline_effective.clone(),
1621                                properties: properties
1622                                    .iter()
1623                                    .map(|p| TypedProperty {
1624                                        name: p.name.clone(),
1625                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1626                                        nullable: p.nullable,
1627                                        default_value: None,
1628                                    })
1629                                    .collect(),
1630                                constraints: Vec::new(),
1631                                source_node_types: source_node_types.clone(),
1632                                target_node_types: target_node_types.clone(),
1633                            };
1634                            self.catalog.register_or_replace_edge_type_def(def);
1635                            #[cfg(feature = "wal")]
1636                            {
1637                                let props_for_wal: Vec<(String, String, bool)> = properties
1638                                    .iter()
1639                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1640                                    .collect();
1641                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1642                                    name: inline_effective.clone(),
1643                                    properties: props_for_wal,
1644                                    constraints: Vec::new(),
1645                                });
1646                            }
1647                            if !edge_types.contains(&inline_effective) {
1648                                edge_types.push(inline_effective);
1649                            }
1650                        }
1651                    }
1652                }
1653
1654                let def = GraphTypeDefinition {
1655                    name: effective_name.clone(),
1656                    allowed_node_types: node_types.clone(),
1657                    allowed_edge_types: edge_types.clone(),
1658                    open,
1659                };
1660                let result = if stmt.or_replace {
1661                    // Drop existing first, ignore error if not found
1662                    let _ = self.catalog.drop_graph_type(&effective_name);
1663                    self.catalog.register_graph_type(def)
1664                } else {
1665                    self.catalog.register_graph_type(def)
1666                };
1667                match result {
1668                    Ok(()) => {
1669                        wal_log!(
1670                            self,
1671                            WalRecord::CreateGraphType {
1672                                name: effective_name.clone(),
1673                                node_types,
1674                                edge_types,
1675                                open,
1676                            }
1677                        );
1678                        Ok(QueryResult::status(format!(
1679                            "Created graph type '{}'",
1680                            stmt.name
1681                        )))
1682                    }
1683                    Err(e) if stmt.if_not_exists => {
1684                        let _ = e;
1685                        Ok(QueryResult::status("No change"))
1686                    }
1687                    Err(e) => Err(Error::Query(QueryError::new(
1688                        QueryErrorKind::Semantic,
1689                        e.to_string(),
1690                    ))),
1691                }
1692            }
1693            SchemaStatement::DropGraphType { name, if_exists } => {
1694                let effective_name = self.effective_type_key(&name);
1695                match self.catalog.drop_graph_type(&effective_name) {
1696                    Ok(()) => {
1697                        wal_log!(
1698                            self,
1699                            WalRecord::DropGraphType {
1700                                name: effective_name
1701                            }
1702                        );
1703                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1704                    }
1705                    Err(e) if if_exists => {
1706                        let _ = e;
1707                        Ok(QueryResult::status("No change"))
1708                    }
1709                    Err(e) => Err(Error::Query(QueryError::new(
1710                        QueryErrorKind::Semantic,
1711                        e.to_string(),
1712                    ))),
1713                }
1714            }
1715            SchemaStatement::CreateSchema {
1716                name,
1717                if_not_exists,
1718            } => {
1719                if name.contains('/') {
1720                    return Err(Error::Query(QueryError::new(
1721                        QueryErrorKind::Semantic,
1722                        format!(
1723                            "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1724                        ),
1725                    )));
1726                }
1727                match self.catalog.register_schema_namespace(name.clone()) {
1728                    Ok(()) => {
1729                        wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1730                        // Auto-create the schema's default graph partition so that
1731                        // SESSION SET SCHEMA + queries work without an explicit graph.
1732                        let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1733                        if self.store.create_graph(&default_key).unwrap_or(false) {
1734                            wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1735                        }
1736                        Ok(QueryResult::status(format!("Created schema '{name}'")))
1737                    }
1738                    Err(e) if if_not_exists => {
1739                        let _ = e;
1740                        Ok(QueryResult::status("No change"))
1741                    }
1742                    Err(e) => Err(Error::Query(QueryError::new(
1743                        QueryErrorKind::Semantic,
1744                        e.to_string(),
1745                    ))),
1746                }
1747            }
1748            SchemaStatement::DropSchema { name, if_exists } => {
1749                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1750                // The auto-created __default__ graph is exempt from the check.
1751                let prefix = format!("{name}/");
1752                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1753                let has_graphs = self
1754                    .store
1755                    .graph_names()
1756                    .iter()
1757                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1758                let has_types = self
1759                    .catalog
1760                    .all_node_type_names()
1761                    .iter()
1762                    .any(|n| n.starts_with(&prefix))
1763                    || self
1764                        .catalog
1765                        .all_edge_type_names()
1766                        .iter()
1767                        .any(|n| n.starts_with(&prefix))
1768                    || self
1769                        .catalog
1770                        .all_graph_type_names()
1771                        .iter()
1772                        .any(|n| n.starts_with(&prefix));
1773                if has_graphs || has_types {
1774                    return Err(Error::Query(QueryError::new(
1775                        QueryErrorKind::Semantic,
1776                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1777                    )));
1778                }
1779                match self.catalog.drop_schema_namespace(&name) {
1780                    Ok(()) => {
1781                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1782                        // Drop the auto-created default graph partition
1783                        if self.store.drop_graph(&default_graph_key) {
1784                            wal_log!(
1785                                self,
1786                                WalRecord::DropNamedGraph {
1787                                    name: default_graph_key,
1788                                }
1789                            );
1790                        }
1791                        // If this session was using the dropped schema, reset it
1792                        let mut current = self.current_schema.lock();
1793                        if current
1794                            .as_deref()
1795                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1796                        {
1797                            *current = None;
1798                        }
1799                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1800                    }
1801                    Err(e) if if_exists => {
1802                        let _ = e;
1803                        Ok(QueryResult::status("No change"))
1804                    }
1805                    Err(e) => Err(Error::Query(QueryError::new(
1806                        QueryErrorKind::Semantic,
1807                        e.to_string(),
1808                    ))),
1809                }
1810            }
1811            SchemaStatement::AlterNodeType(stmt) => {
1812                use grafeo_adapters::query::gql::ast::TypeAlteration;
1813                let effective_name = self.effective_type_key(&stmt.name);
1814                let mut wal_alts = Vec::new();
1815                for alt in &stmt.alterations {
1816                    match alt {
1817                        TypeAlteration::AddProperty(prop) => {
1818                            let typed = TypedProperty {
1819                                name: prop.name.clone(),
1820                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1821                                nullable: prop.nullable,
1822                                default_value: prop
1823                                    .default_value
1824                                    .as_ref()
1825                                    .map(|s| parse_default_literal(s)),
1826                            };
1827                            self.catalog
1828                                .alter_node_type_add_property(&effective_name, typed)
1829                                .map_err(|e| {
1830                                    Error::Query(QueryError::new(
1831                                        QueryErrorKind::Semantic,
1832                                        e.to_string(),
1833                                    ))
1834                                })?;
1835                            wal_alts.push((
1836                                "add".to_string(),
1837                                prop.name.clone(),
1838                                prop.data_type.clone(),
1839                                prop.nullable,
1840                            ));
1841                        }
1842                        TypeAlteration::DropProperty(name) => {
1843                            self.catalog
1844                                .alter_node_type_drop_property(&effective_name, name)
1845                                .map_err(|e| {
1846                                    Error::Query(QueryError::new(
1847                                        QueryErrorKind::Semantic,
1848                                        e.to_string(),
1849                                    ))
1850                                })?;
1851                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1852                        }
1853                    }
1854                }
1855                wal_log!(
1856                    self,
1857                    WalRecord::AlterNodeType {
1858                        name: effective_name,
1859                        alterations: wal_alts,
1860                    }
1861                );
1862                Ok(QueryResult::status(format!(
1863                    "Altered node type '{}'",
1864                    stmt.name
1865                )))
1866            }
1867            SchemaStatement::AlterEdgeType(stmt) => {
1868                use grafeo_adapters::query::gql::ast::TypeAlteration;
1869                let effective_name = self.effective_type_key(&stmt.name);
1870                let mut wal_alts = Vec::new();
1871                for alt in &stmt.alterations {
1872                    match alt {
1873                        TypeAlteration::AddProperty(prop) => {
1874                            let typed = TypedProperty {
1875                                name: prop.name.clone(),
1876                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1877                                nullable: prop.nullable,
1878                                default_value: prop
1879                                    .default_value
1880                                    .as_ref()
1881                                    .map(|s| parse_default_literal(s)),
1882                            };
1883                            self.catalog
1884                                .alter_edge_type_add_property(&effective_name, typed)
1885                                .map_err(|e| {
1886                                    Error::Query(QueryError::new(
1887                                        QueryErrorKind::Semantic,
1888                                        e.to_string(),
1889                                    ))
1890                                })?;
1891                            wal_alts.push((
1892                                "add".to_string(),
1893                                prop.name.clone(),
1894                                prop.data_type.clone(),
1895                                prop.nullable,
1896                            ));
1897                        }
1898                        TypeAlteration::DropProperty(name) => {
1899                            self.catalog
1900                                .alter_edge_type_drop_property(&effective_name, name)
1901                                .map_err(|e| {
1902                                    Error::Query(QueryError::new(
1903                                        QueryErrorKind::Semantic,
1904                                        e.to_string(),
1905                                    ))
1906                                })?;
1907                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1908                        }
1909                    }
1910                }
1911                wal_log!(
1912                    self,
1913                    WalRecord::AlterEdgeType {
1914                        name: effective_name,
1915                        alterations: wal_alts,
1916                    }
1917                );
1918                Ok(QueryResult::status(format!(
1919                    "Altered edge type '{}'",
1920                    stmt.name
1921                )))
1922            }
1923            SchemaStatement::AlterGraphType(stmt) => {
1924                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1925                let effective_name = self.effective_type_key(&stmt.name);
1926                let mut wal_alts = Vec::new();
1927                for alt in &stmt.alterations {
1928                    match alt {
1929                        GraphTypeAlteration::AddNodeType(name) => {
1930                            self.catalog
1931                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1932                                .map_err(|e| {
1933                                    Error::Query(QueryError::new(
1934                                        QueryErrorKind::Semantic,
1935                                        e.to_string(),
1936                                    ))
1937                                })?;
1938                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1939                        }
1940                        GraphTypeAlteration::DropNodeType(name) => {
1941                            self.catalog
1942                                .alter_graph_type_drop_node_type(&effective_name, name)
1943                                .map_err(|e| {
1944                                    Error::Query(QueryError::new(
1945                                        QueryErrorKind::Semantic,
1946                                        e.to_string(),
1947                                    ))
1948                                })?;
1949                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1950                        }
1951                        GraphTypeAlteration::AddEdgeType(name) => {
1952                            self.catalog
1953                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1954                                .map_err(|e| {
1955                                    Error::Query(QueryError::new(
1956                                        QueryErrorKind::Semantic,
1957                                        e.to_string(),
1958                                    ))
1959                                })?;
1960                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1961                        }
1962                        GraphTypeAlteration::DropEdgeType(name) => {
1963                            self.catalog
1964                                .alter_graph_type_drop_edge_type(&effective_name, name)
1965                                .map_err(|e| {
1966                                    Error::Query(QueryError::new(
1967                                        QueryErrorKind::Semantic,
1968                                        e.to_string(),
1969                                    ))
1970                                })?;
1971                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1972                        }
1973                    }
1974                }
1975                wal_log!(
1976                    self,
1977                    WalRecord::AlterGraphType {
1978                        name: effective_name,
1979                        alterations: wal_alts,
1980                    }
1981                );
1982                Ok(QueryResult::status(format!(
1983                    "Altered graph type '{}'",
1984                    stmt.name
1985                )))
1986            }
1987            SchemaStatement::CreateProcedure(stmt) => {
1988                use crate::catalog::ProcedureDefinition;
1989
1990                let def = ProcedureDefinition {
1991                    name: stmt.name.clone(),
1992                    params: stmt
1993                        .params
1994                        .iter()
1995                        .map(|p| (p.name.clone(), p.param_type.clone()))
1996                        .collect(),
1997                    returns: stmt
1998                        .returns
1999                        .iter()
2000                        .map(|r| (r.name.clone(), r.return_type.clone()))
2001                        .collect(),
2002                    body: stmt.body.clone(),
2003                };
2004
2005                if stmt.or_replace {
2006                    self.catalog.replace_procedure(def).map_err(|e| {
2007                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2008                    })?;
2009                } else {
2010                    match self.catalog.register_procedure(def) {
2011                        Ok(()) => {}
2012                        Err(_) if stmt.if_not_exists => {
2013                            return Ok(QueryResult::empty());
2014                        }
2015                        Err(e) => {
2016                            return Err(Error::Query(QueryError::new(
2017                                QueryErrorKind::Semantic,
2018                                e.to_string(),
2019                            )));
2020                        }
2021                    }
2022                }
2023
2024                wal_log!(
2025                    self,
2026                    WalRecord::CreateProcedure {
2027                        name: stmt.name.clone(),
2028                        params: stmt
2029                            .params
2030                            .iter()
2031                            .map(|p| (p.name.clone(), p.param_type.clone()))
2032                            .collect(),
2033                        returns: stmt
2034                            .returns
2035                            .iter()
2036                            .map(|r| (r.name.clone(), r.return_type.clone()))
2037                            .collect(),
2038                        body: stmt.body,
2039                    }
2040                );
2041                Ok(QueryResult::status(format!(
2042                    "Created procedure '{}'",
2043                    stmt.name
2044                )))
2045            }
2046            SchemaStatement::DropProcedure { name, if_exists } => {
2047                match self.catalog.drop_procedure(&name) {
2048                    Ok(()) => {}
2049                    Err(_) if if_exists => {
2050                        return Ok(QueryResult::empty());
2051                    }
2052                    Err(e) => {
2053                        return Err(Error::Query(QueryError::new(
2054                            QueryErrorKind::Semantic,
2055                            e.to_string(),
2056                        )));
2057                    }
2058                }
2059                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2060                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2061            }
2062            SchemaStatement::ShowIndexes => {
2063                return self.execute_show_indexes();
2064            }
2065            SchemaStatement::ShowConstraints => {
2066                return self.execute_show_constraints();
2067            }
2068            SchemaStatement::ShowNodeTypes => {
2069                return self.execute_show_node_types();
2070            }
2071            SchemaStatement::ShowEdgeTypes => {
2072                return self.execute_show_edge_types();
2073            }
2074            SchemaStatement::ShowGraphTypes => {
2075                return self.execute_show_graph_types();
2076            }
2077            SchemaStatement::ShowGraphType(name) => {
2078                return self.execute_show_graph_type(&name);
2079            }
2080            SchemaStatement::ShowCurrentGraphType => {
2081                return self.execute_show_current_graph_type();
2082            }
2083            SchemaStatement::ShowGraphs => {
2084                return self.execute_show_graphs();
2085            }
2086            SchemaStatement::ShowSchemas => {
2087                return self.execute_show_schemas();
2088            }
2089        };
2090
2091        // Invalidate all cached query plans after any successful DDL change.
2092        // DDL is rare, so clearing the entire cache is cheap and correct.
2093        if result.is_ok() {
2094            self.query_cache.clear();
2095        }
2096
2097        result
2098    }
2099
2100    /// Creates a vector index on the store by scanning existing nodes.
2101    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2102    fn create_vector_index_on_store(
2103        store: &LpgStore,
2104        label: &str,
2105        property: &str,
2106        dimensions: Option<usize>,
2107        metric: Option<&str>,
2108    ) -> Result<()> {
2109        use grafeo_common::types::{PropertyKey, Value};
2110        use grafeo_common::utils::error::Error;
2111        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2112
2113        let metric = match metric {
2114            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2115                Error::Internal(format!(
2116                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2117                ))
2118            })?,
2119            None => DistanceMetric::Cosine,
2120        };
2121
2122        let prop_key = PropertyKey::new(property);
2123        let mut found_dims: Option<usize> = dimensions;
2124        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2125
2126        for node in store.nodes_with_label(label) {
2127            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2128                if let Some(expected) = found_dims {
2129                    if v.len() != expected {
2130                        return Err(Error::Internal(format!(
2131                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2132                            v.len(),
2133                            node.id.0
2134                        )));
2135                    }
2136                } else {
2137                    found_dims = Some(v.len());
2138                }
2139                vectors.push((node.id, v.to_vec()));
2140            }
2141        }
2142
2143        let Some(dims) = found_dims else {
2144            return Err(Error::Internal(format!(
2145                "No vector properties found on :{label}({property}) and no dimensions specified"
2146            )));
2147        };
2148
2149        let config = HnswConfig::new(dims, metric);
2150        let index = HnswIndex::with_capacity(config, vectors.len());
2151        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2152        for (node_id, vec) in &vectors {
2153            index.insert(*node_id, vec, &accessor);
2154        }
2155
2156        store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2157        Ok(())
2158    }
2159
2160    /// Stub for when vector-index feature is not enabled.
2161    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2162    fn create_vector_index_on_store(
2163        _store: &LpgStore,
2164        _label: &str,
2165        _property: &str,
2166        _dimensions: Option<usize>,
2167        _metric: Option<&str>,
2168    ) -> Result<()> {
2169        Err(grafeo_common::utils::error::Error::Internal(
2170            "Vector index support requires the 'vector-index' feature".to_string(),
2171        ))
2172    }
2173
2174    /// Creates a text index on the store by scanning existing nodes.
2175    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2176    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2177        use grafeo_common::types::{PropertyKey, Value};
2178        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2179
2180        let mut index = InvertedIndex::new(BM25Config::default());
2181        let prop_key = PropertyKey::new(property);
2182
2183        let nodes = store.nodes_by_label(label);
2184        for node_id in nodes {
2185            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2186                index.insert(node_id, text.as_str());
2187            }
2188        }
2189
2190        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2191        Ok(())
2192    }
2193
2194    /// Stub for when text-index feature is not enabled.
2195    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2196    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2197        Err(grafeo_common::utils::error::Error::Internal(
2198            "Text index support requires the 'text-index' feature".to_string(),
2199        ))
2200    }
2201
2202    /// Returns a table of all indexes from the catalog.
2203    fn execute_show_indexes(&self) -> Result<QueryResult> {
2204        let indexes = self.catalog.all_indexes();
2205        let columns = vec![
2206            "name".to_string(),
2207            "type".to_string(),
2208            "label".to_string(),
2209            "property".to_string(),
2210        ];
2211        let rows: Vec<Vec<Value>> = indexes
2212            .into_iter()
2213            .map(|def| {
2214                let label_name = self
2215                    .catalog
2216                    .get_label_name(def.label)
2217                    .unwrap_or_else(|| "?".into());
2218                let prop_name = self
2219                    .catalog
2220                    .get_property_key_name(def.property_key)
2221                    .unwrap_or_else(|| "?".into());
2222                vec![
2223                    Value::from(def.name),
2224                    Value::from(format!("{:?}", def.index_type)),
2225                    Value::from(&*label_name),
2226                    Value::from(&*prop_name),
2227                ]
2228            })
2229            .collect();
2230        Ok(QueryResult {
2231            columns,
2232            column_types: Vec::new(),
2233            rows,
2234            ..QueryResult::empty()
2235        })
2236    }
2237
2238    /// Returns a table of all constraints (currently metadata-only).
2239    fn execute_show_constraints(&self) -> Result<QueryResult> {
2240        // Constraints are tracked in WAL but not yet in a queryable catalog.
2241        // Return an empty table with the expected schema.
2242        Ok(QueryResult {
2243            columns: vec![
2244                "name".to_string(),
2245                "type".to_string(),
2246                "label".to_string(),
2247                "properties".to_string(),
2248            ],
2249            column_types: Vec::new(),
2250            rows: Vec::new(),
2251            ..QueryResult::empty()
2252        })
2253    }
2254
2255    /// Returns a table of all registered node types in the current schema.
2256    fn execute_show_node_types(&self) -> Result<QueryResult> {
2257        let columns = vec![
2258            "name".to_string(),
2259            "properties".to_string(),
2260            "constraints".to_string(),
2261            "parents".to_string(),
2262        ];
2263        let schema = self.current_schema.lock().clone();
2264        let all_names = self.catalog.all_node_type_names();
2265        let type_names: Vec<String> = match &schema {
2266            Some(s) => {
2267                let prefix = format!("{s}/");
2268                all_names
2269                    .into_iter()
2270                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2271                    .collect()
2272            }
2273            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2274        };
2275        let rows: Vec<Vec<Value>> = type_names
2276            .into_iter()
2277            .filter_map(|name| {
2278                let lookup = match &schema {
2279                    Some(s) => format!("{s}/{name}"),
2280                    None => name.clone(),
2281                };
2282                let def = self.catalog.get_node_type(&lookup)?;
2283                let props: Vec<String> = def
2284                    .properties
2285                    .iter()
2286                    .map(|p| {
2287                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2288                        format!("{} {}{}", p.name, p.data_type, nullable)
2289                    })
2290                    .collect();
2291                let constraints: Vec<String> =
2292                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2293                let parents = def.parent_types.join(", ");
2294                Some(vec![
2295                    Value::from(name),
2296                    Value::from(props.join(", ")),
2297                    Value::from(constraints.join(", ")),
2298                    Value::from(parents),
2299                ])
2300            })
2301            .collect();
2302        Ok(QueryResult {
2303            columns,
2304            column_types: Vec::new(),
2305            rows,
2306            ..QueryResult::empty()
2307        })
2308    }
2309
2310    /// Returns a table of all registered edge types in the current schema.
2311    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2312        let columns = vec![
2313            "name".to_string(),
2314            "properties".to_string(),
2315            "source_types".to_string(),
2316            "target_types".to_string(),
2317        ];
2318        let schema = self.current_schema.lock().clone();
2319        let all_names = self.catalog.all_edge_type_names();
2320        let type_names: Vec<String> = match &schema {
2321            Some(s) => {
2322                let prefix = format!("{s}/");
2323                all_names
2324                    .into_iter()
2325                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2326                    .collect()
2327            }
2328            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2329        };
2330        let rows: Vec<Vec<Value>> = type_names
2331            .into_iter()
2332            .filter_map(|name| {
2333                let lookup = match &schema {
2334                    Some(s) => format!("{s}/{name}"),
2335                    None => name.clone(),
2336                };
2337                let def = self.catalog.get_edge_type_def(&lookup)?;
2338                let props: Vec<String> = def
2339                    .properties
2340                    .iter()
2341                    .map(|p| {
2342                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2343                        format!("{} {}{}", p.name, p.data_type, nullable)
2344                    })
2345                    .collect();
2346                let src = def.source_node_types.join(", ");
2347                let tgt = def.target_node_types.join(", ");
2348                Some(vec![
2349                    Value::from(name),
2350                    Value::from(props.join(", ")),
2351                    Value::from(src),
2352                    Value::from(tgt),
2353                ])
2354            })
2355            .collect();
2356        Ok(QueryResult {
2357            columns,
2358            column_types: Vec::new(),
2359            rows,
2360            ..QueryResult::empty()
2361        })
2362    }
2363
2364    /// Returns a table of all registered graph types in the current schema.
2365    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2366        let columns = vec![
2367            "name".to_string(),
2368            "open".to_string(),
2369            "node_types".to_string(),
2370            "edge_types".to_string(),
2371        ];
2372        let schema = self.current_schema.lock().clone();
2373        let all_names = self.catalog.all_graph_type_names();
2374        let type_names: Vec<String> = match &schema {
2375            Some(s) => {
2376                let prefix = format!("{s}/");
2377                all_names
2378                    .into_iter()
2379                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2380                    .collect()
2381            }
2382            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2383        };
2384        let rows: Vec<Vec<Value>> = type_names
2385            .into_iter()
2386            .filter_map(|name| {
2387                let lookup = match &schema {
2388                    Some(s) => format!("{s}/{name}"),
2389                    None => name.clone(),
2390                };
2391                let def = self.catalog.get_graph_type_def(&lookup)?;
2392                // Strip schema prefix from allowed type names for display
2393                let strip = |n: &String| -> String {
2394                    match &schema {
2395                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2396                        None => n.clone(),
2397                    }
2398                };
2399                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2400                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2401                Some(vec![
2402                    Value::from(name),
2403                    Value::from(def.open),
2404                    Value::from(node_types.join(", ")),
2405                    Value::from(edge_types.join(", ")),
2406                ])
2407            })
2408            .collect();
2409        Ok(QueryResult {
2410            columns,
2411            column_types: Vec::new(),
2412            rows,
2413            ..QueryResult::empty()
2414        })
2415    }
2416
2417    /// Returns the list of named graphs visible in the current schema context.
2418    ///
2419    /// When a session schema is set, only graphs belonging to that schema are
2420    /// shown (their compound prefix is stripped). When no schema is set, graphs
2421    /// without a schema prefix are shown (the default schema).
2422    #[cfg(feature = "lpg")]
2423    fn execute_show_graphs(&self) -> Result<QueryResult> {
2424        let schema = self.current_schema.lock().clone();
2425        let all_names = self.store.graph_names();
2426
2427        let mut names: Vec<String> = match &schema {
2428            Some(s) => {
2429                let prefix = format!("{s}/");
2430                all_names
2431                    .into_iter()
2432                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2433                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2434                    .collect()
2435            }
2436            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2437        };
2438        names.sort();
2439
2440        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2441        Ok(QueryResult {
2442            columns: vec!["name".to_string()],
2443            column_types: Vec::new(),
2444            rows,
2445            ..QueryResult::empty()
2446        })
2447    }
2448
2449    /// Returns the list of all schema namespaces.
2450    fn execute_show_schemas(&self) -> Result<QueryResult> {
2451        let mut names = self.catalog.schema_names();
2452        names.sort();
2453        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2454        Ok(QueryResult {
2455            columns: vec!["name".to_string()],
2456            column_types: Vec::new(),
2457            rows,
2458            ..QueryResult::empty()
2459        })
2460    }
2461
2462    /// Returns detailed info for a specific graph type.
2463    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2464        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2465
2466        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2467            Error::Query(QueryError::new(
2468                QueryErrorKind::Semantic,
2469                format!("Graph type '{name}' not found"),
2470            ))
2471        })?;
2472
2473        let columns = vec![
2474            "name".to_string(),
2475            "open".to_string(),
2476            "node_types".to_string(),
2477            "edge_types".to_string(),
2478        ];
2479        let rows = vec![vec![
2480            Value::from(def.name),
2481            Value::from(def.open),
2482            Value::from(def.allowed_node_types.join(", ")),
2483            Value::from(def.allowed_edge_types.join(", ")),
2484        ]];
2485        Ok(QueryResult {
2486            columns,
2487            column_types: Vec::new(),
2488            rows,
2489            ..QueryResult::empty()
2490        })
2491    }
2492
2493    /// Returns the graph type bound to the current graph.
2494    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2495        let graph_name = self
2496            .current_graph()
2497            .unwrap_or_else(|| "default".to_string());
2498        let columns = vec![
2499            "graph".to_string(),
2500            "graph_type".to_string(),
2501            "open".to_string(),
2502            "node_types".to_string(),
2503            "edge_types".to_string(),
2504        ];
2505
2506        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2507            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2508        {
2509            let rows = vec![vec![
2510                Value::from(graph_name),
2511                Value::from(type_name),
2512                Value::from(def.open),
2513                Value::from(def.allowed_node_types.join(", ")),
2514                Value::from(def.allowed_edge_types.join(", ")),
2515            ]];
2516            return Ok(QueryResult {
2517                columns,
2518                column_types: Vec::new(),
2519                rows,
2520                ..QueryResult::empty()
2521            });
2522        }
2523
2524        // No graph type binding found
2525        Ok(QueryResult {
2526            columns,
2527            column_types: Vec::new(),
2528            rows: vec![vec![
2529                Value::from(graph_name),
2530                Value::Null,
2531                Value::Null,
2532                Value::Null,
2533                Value::Null,
2534            ]],
2535            ..QueryResult::empty()
2536        })
2537    }
2538
2539    /// Executes a GQL query.
2540    ///
2541    /// # Errors
2542    ///
2543    /// Returns an error if the query fails to parse or execute.
2544    ///
2545    /// # Examples
2546    ///
2547    /// ```no_run
2548    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2549    /// use grafeo_engine::GrafeoDB;
2550    ///
2551    /// let db = GrafeoDB::new_in_memory();
2552    /// let session = db.session();
2553    ///
2554    /// // Create a node
2555    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2556    ///
2557    /// // Query nodes
2558    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2559    /// for row in result.rows() {
2560    ///     println!("{:?}", row);
2561    /// }
2562    /// # Ok(())
2563    /// # }
2564    /// ```
2565    #[cfg(feature = "gql")]
2566    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2567        self.require_lpg("GQL")?;
2568
2569        #[cfg(feature = "testing-statement-injection")]
2570        grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2571            grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2572        })?;
2573
2574        use crate::query::{
2575            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2576            translators::gql,
2577        };
2578
2579        let _span = grafeo_info_span!(
2580            "grafeo::session::execute",
2581            language = "gql",
2582            query_len = query.len(),
2583        );
2584
2585        #[cfg(not(target_arch = "wasm32"))]
2586        let start_time = std::time::Instant::now();
2587
2588        // Parse and translate, checking for session/schema commands first
2589        let translation = gql::translate_full(query)?;
2590        let logical_plan = match translation {
2591            gql::GqlTranslationResult::SessionCommand(cmd) => {
2592                return self.execute_session_command(cmd);
2593            }
2594            #[cfg(feature = "lpg")]
2595            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2596                // All DDL requires Admin role
2597                self.require_permission(crate::auth::StatementKind::Admin)?;
2598                if *self.read_only_tx.lock() {
2599                    return Err(grafeo_common::utils::error::Error::Transaction(
2600                        grafeo_common::utils::error::TransactionError::ReadOnly,
2601                    ));
2602                }
2603                return self.execute_schema_command(cmd);
2604            }
2605            gql::GqlTranslationResult::Plan(plan) => {
2606                // Only walk the operator tree when it matters: non-admin
2607                // identities need permission checks, read-only transactions
2608                // need mutation blocking. Admin sessions in auto-commit mode
2609                // skip the tree walk entirely.
2610                let read_only = *self.read_only_tx.lock();
2611                let need_check = read_only || !self.identity.can_admin();
2612                let is_mutation = need_check && plan.root.has_mutations();
2613                if is_mutation {
2614                    self.require_permission(crate::auth::StatementKind::Write)?;
2615                }
2616                if read_only && is_mutation {
2617                    return Err(grafeo_common::utils::error::Error::Transaction(
2618                        grafeo_common::utils::error::TransactionError::ReadOnly,
2619                    ));
2620                }
2621                plan
2622            }
2623            #[cfg(not(feature = "lpg"))]
2624            gql::GqlTranslationResult::SchemaCommand(_) => {
2625                return Err(grafeo_common::utils::error::Error::Internal(
2626                    "Schema commands require the `lpg` feature".to_string(),
2627                ));
2628            }
2629        };
2630
2631        // Create cache key for this query
2632        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2633
2634        // Try to get cached optimized plan, or use the plan we just translated
2635        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2636            cached_plan
2637        } else {
2638            // Semantic validation
2639            let mut binder = Binder::new();
2640            let _binding_context = binder.bind(&logical_plan)?;
2641
2642            // Optimize the plan
2643            let active = self.active_store();
2644            let optimizer = Optimizer::from_graph_store(&*active);
2645            let plan = optimizer.optimize(logical_plan)?;
2646
2647            // Cache the optimized plan for future use
2648            self.query_cache.put_optimized(cache_key, plan.clone());
2649
2650            plan
2651        };
2652
2653        // Resolve the active store for query execution
2654        let active = self.active_store();
2655
2656        // EXPLAIN: annotate pushdown hints and return the plan tree
2657        if optimized_plan.explain {
2658            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2659            let mut plan = optimized_plan;
2660            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2661            return Ok(explain_result(&plan));
2662        }
2663
2664        // PROFILE: execute with per-operator instrumentation
2665        if optimized_plan.profile {
2666            let has_mutations = optimized_plan.root.has_mutations();
2667            return self.with_auto_commit(has_mutations, || {
2668                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2669                let planner = self.create_planner_for_store(
2670                    Arc::clone(&active),
2671                    viewing_epoch,
2672                    transaction_id,
2673                );
2674                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2675
2676                let executor = self.make_executor(physical_plan.columns.clone());
2677                let _result = executor.execute(physical_plan.operator.as_mut())?;
2678
2679                let total_time_ms;
2680                #[cfg(not(target_arch = "wasm32"))]
2681                {
2682                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2683                }
2684                #[cfg(target_arch = "wasm32")]
2685                {
2686                    total_time_ms = 0.0;
2687                }
2688
2689                let profile_tree = crate::query::profile::build_profile_tree(
2690                    &optimized_plan.root,
2691                    &mut entries.into_iter(),
2692                );
2693                Ok(crate::query::profile::profile_result(
2694                    &profile_tree,
2695                    total_time_ms,
2696                ))
2697            });
2698        }
2699
2700        let has_mutations = optimized_plan.root.has_mutations();
2701
2702        let result = self.with_auto_commit(has_mutations, || {
2703            // Get transaction context for MVCC visibility
2704            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2705
2706            // Convert to physical plan with transaction context
2707            // (Physical planning cannot be cached as it depends on transaction state)
2708            // Safe to use read-only fast path when: this query has no mutations AND
2709            // there is no active transaction that may have prior uncommitted writes.
2710            let has_active_tx = self.current_transaction.lock().is_some();
2711            let read_only = !has_mutations && !has_active_tx;
2712            let planner = self.create_planner_for_store_with_read_only(
2713                Arc::clone(&active),
2714                viewing_epoch,
2715                transaction_id,
2716                read_only,
2717            );
2718            let physical_plan = planner.plan(&optimized_plan)?;
2719
2720            // Execute the plan via push-based pipeline when possible
2721            let executor = self.make_executor(physical_plan.columns.clone());
2722            let (mut source, push_ops) = {
2723                #[cfg(feature = "spill")]
2724                {
2725                    let memory_ctx = self.make_operator_memory_context();
2726                    grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2727                        physical_plan.into_operator(),
2728                        memory_ctx,
2729                    )
2730                }
2731                #[cfg(not(feature = "spill"))]
2732                {
2733                    grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2734                        physical_plan.into_operator(),
2735                    )
2736                }
2737            };
2738            let mut result = if push_ops.is_empty() {
2739                // Pure source query: use traditional pull-based execution
2740                executor.execute(source.as_mut())?
2741            } else {
2742                // Pipeline execution: push data from source through operators
2743                executor.execute_pipeline(source, push_ops)?
2744            };
2745
2746            // Add execution metrics
2747            let rows_scanned = result.rows.len() as u64;
2748            #[cfg(not(target_arch = "wasm32"))]
2749            {
2750                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2751                result.execution_time_ms = Some(elapsed_ms);
2752            }
2753            result.rows_scanned = Some(rows_scanned);
2754
2755            Ok(result)
2756        });
2757
2758        // Record metrics for this query execution.
2759        #[cfg(feature = "metrics")]
2760        {
2761            #[cfg(not(target_arch = "wasm32"))]
2762            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2763            #[cfg(target_arch = "wasm32")]
2764            let elapsed_ms = None;
2765            self.record_query_metrics("gql", elapsed_ms, &result);
2766        }
2767
2768        result
2769    }
2770
2771    /// Executes a GQL query and returns a lazy result stream.
2772    ///
2773    /// The stream pulls chunks from the operator pipeline on demand. Use it
2774    /// when the result set is too large to fit in memory or when you want
2775    /// first-row latency (process rows as they arrive rather than waiting
2776    /// for the whole query to complete).
2777    ///
2778    /// This is the read-only, pull-only path: `execute_streaming` rejects
2779    /// mutations (INSERT/DELETE/SET), schema/session commands, EXPLAIN,
2780    /// PROFILE, and queries whose planner emits a push-based pipeline. Run
2781    /// those via [`execute`](Self::execute) instead.
2782    ///
2783    /// # Stability: Experimental
2784    ///
2785    /// New in 0.5.40. Signature may change before being promoted to Beta.
2786    ///
2787    /// # Errors
2788    ///
2789    /// Returns an error if parsing or planning fails, if the query is a
2790    /// kind that cannot be streamed, or if permission checks fail.
2791    #[cfg(all(feature = "gql", feature = "lpg"))]
2792    pub fn execute_streaming(
2793        &self,
2794        query: &str,
2795    ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2796        use crate::query::executor::stream::{ResultStream, StreamGuard};
2797
2798        let (source, columns, deadline) = self.build_streaming_plan(query)?;
2799        let guard = StreamGuard::new(&self.active_streams);
2800        Ok(ResultStream::new(source, columns, deadline, guard))
2801    }
2802
2803    /// Builds a pull-based physical pipeline for streaming, returning the
2804    /// root operator and metadata needed to wrap it in a `ResultStream` or
2805    /// `OwnedResultStream`.
2806    ///
2807    /// Rejects session/schema commands, mutations, EXPLAIN/PROFILE, and plans
2808    /// that require a push-based pipeline. Used by both
2809    /// [`Session::execute_streaming`] and [`GrafeoDB::execute_streaming`].
2810    #[cfg(all(feature = "gql", feature = "lpg"))]
2811    pub(crate) fn build_streaming_plan(
2812        &self,
2813        query: &str,
2814    ) -> Result<(
2815        Box<dyn grafeo_core::execution::operators::Operator>,
2816        Vec<String>,
2817        Option<Instant>,
2818    )> {
2819        use crate::query::{
2820            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2821            translators::gql,
2822        };
2823
2824        self.require_lpg("GQL")?;
2825
2826        let _span = grafeo_info_span!(
2827            "grafeo::session::execute_streaming",
2828            language = "gql",
2829            query_len = query.len(),
2830        );
2831
2832        // Parse and translate, rejecting anything that isn't a streamable query.
2833        let translation = gql::translate_full(query)?;
2834        let logical_plan = match translation {
2835            gql::GqlTranslationResult::SessionCommand(_) => {
2836                return Err(grafeo_common::utils::error::Error::Query(
2837                    grafeo_common::utils::error::QueryError::new(
2838                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2839                        "session commands cannot be streamed; use execute() instead",
2840                    ),
2841                ));
2842            }
2843            gql::GqlTranslationResult::SchemaCommand(_) => {
2844                return Err(grafeo_common::utils::error::Error::Query(
2845                    grafeo_common::utils::error::QueryError::new(
2846                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2847                        "schema DDL cannot be streamed; use execute() instead",
2848                    ),
2849                ));
2850            }
2851            gql::GqlTranslationResult::Plan(plan) => {
2852                if plan.root.has_mutations() {
2853                    return Err(grafeo_common::utils::error::Error::Query(
2854                        grafeo_common::utils::error::QueryError::new(
2855                            grafeo_common::utils::error::QueryErrorKind::Semantic,
2856                            "mutating queries cannot be streamed; use execute() instead",
2857                        ),
2858                    ));
2859                }
2860                if !self.identity.can_admin() {
2861                    self.require_permission(crate::auth::StatementKind::Read)?;
2862                }
2863                plan
2864            }
2865        };
2866
2867        // Cache + bind + optimize (same path as execute).
2868        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2869        let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2870            cached
2871        } else {
2872            let mut binder = Binder::new();
2873            let _binding_context = binder.bind(&logical_plan)?;
2874            let active = self.active_store();
2875            let optimizer = Optimizer::from_graph_store(&*active);
2876            let plan = optimizer.optimize(logical_plan)?;
2877            self.query_cache.put_optimized(cache_key, plan.clone());
2878            plan
2879        };
2880
2881        if optimized_plan.explain || optimized_plan.profile {
2882            return Err(grafeo_common::utils::error::Error::Query(
2883                grafeo_common::utils::error::QueryError::new(
2884                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2885                    "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2886                ),
2887            ));
2888        }
2889
2890        // Plan to physical operators.
2891        let active = self.active_store();
2892        let has_active_tx = self.current_transaction.lock().is_some();
2893        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2894        let planner = self.create_planner_for_store_with_read_only(
2895            Arc::clone(&active),
2896            viewing_epoch,
2897            transaction_id,
2898            !has_active_tx,
2899        );
2900        let physical_plan = planner.plan(&optimized_plan)?;
2901        let columns = physical_plan.columns.clone();
2902
2903        // Streaming only supports the pure pull path. Reject plans that would
2904        // require push operators (Sort, Aggregate, Distinct, etc.) so we never
2905        // silently materialize under the hood.
2906        let (source, push_ops) = {
2907            #[cfg(feature = "spill")]
2908            {
2909                let memory_ctx = self.make_operator_memory_context();
2910                grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2911                    physical_plan.into_operator(),
2912                    memory_ctx,
2913                )
2914            }
2915            #[cfg(not(feature = "spill"))]
2916            {
2917                grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2918                    physical_plan.into_operator(),
2919                )
2920            }
2921        };
2922        if !push_ops.is_empty() {
2923            return Err(grafeo_common::utils::error::Error::Query(
2924                grafeo_common::utils::error::QueryError::new(
2925                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2926                    "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2927                     which cannot be streamed; use execute() instead",
2928                ),
2929            ));
2930        }
2931
2932        Ok((source, columns, self.query_deadline()))
2933    }
2934
2935    /// Executes a GQL query with visibility at the specified epoch.
2936    ///
2937    /// This enables time-travel queries: the query sees the database
2938    /// as it existed at the given epoch.
2939    ///
2940    /// # Errors
2941    ///
2942    /// Returns an error if parsing or execution fails.
2943    #[cfg(feature = "gql")]
2944    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2945        let previous = self.viewing_epoch_override.lock().replace(epoch);
2946        let result = self.execute(query);
2947        *self.viewing_epoch_override.lock() = previous;
2948        result
2949    }
2950
2951    /// Executes a GQL query at a specific epoch with optional parameters.
2952    ///
2953    /// Combines epoch-based time travel with parameterized queries.
2954    ///
2955    /// # Errors
2956    ///
2957    /// Returns an error if parsing or execution fails.
2958    #[cfg(feature = "gql")]
2959    pub fn execute_at_epoch_with_params(
2960        &self,
2961        query: &str,
2962        epoch: EpochId,
2963        params: Option<std::collections::HashMap<String, Value>>,
2964    ) -> Result<QueryResult> {
2965        let previous = self.viewing_epoch_override.lock().replace(epoch);
2966        let result = if let Some(p) = params {
2967            self.execute_with_params(query, p)
2968        } else {
2969            self.execute(query)
2970        };
2971        *self.viewing_epoch_override.lock() = previous;
2972        result
2973    }
2974
2975    /// Executes a GQL query with parameters.
2976    ///
2977    /// # Errors
2978    ///
2979    /// Returns an error if the query fails to parse or execute.
2980    #[cfg(feature = "gql")]
2981    pub fn execute_with_params(
2982        &self,
2983        query: &str,
2984        params: std::collections::HashMap<String, Value>,
2985    ) -> Result<QueryResult> {
2986        self.require_lpg("GQL")?;
2987
2988        use crate::query::processor::{QueryLanguage, QueryProcessor};
2989
2990        // Reject writes if the identity lacks permission. Parse the query
2991        // to determine mutation status reliably (the text heuristic has false
2992        // negatives that could bypass authorization).
2993        let has_mutations = if self.identity.can_write() {
2994            // Fast path: identity can write, use heuristic for auto-commit only
2995            Self::query_looks_like_mutation(query)
2996        } else {
2997            // Restricted identity: parse to check mutations reliably
2998            use crate::query::translators::gql;
2999            match gql::translate(query) {
3000                Ok(plan) if plan.root.has_mutations() => {
3001                    self.require_permission(crate::auth::StatementKind::Write)?;
3002                    true
3003                }
3004                Ok(_) => false,
3005                // Parse error: let the processor handle it below
3006                Err(_) => Self::query_looks_like_mutation(query),
3007            }
3008        };
3009        let active = self.active_store();
3010
3011        self.with_auto_commit(has_mutations, || {
3012            // Get transaction context for MVCC visibility
3013            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3014
3015            // Create processor with transaction context
3016            let processor = QueryProcessor::for_stores_with_transaction(
3017                Arc::clone(&active),
3018                self.active_write_store(),
3019                Arc::clone(&self.transaction_manager),
3020            )?;
3021
3022            // Apply transaction context if in a transaction
3023            let processor = if let Some(transaction_id) = transaction_id {
3024                processor.with_transaction_context(viewing_epoch, transaction_id)
3025            } else {
3026                processor
3027            };
3028
3029            processor.process(query, QueryLanguage::Gql, Some(&params))
3030        })
3031    }
3032
3033    /// Executes a GQL query with parameters.
3034    ///
3035    /// # Errors
3036    ///
3037    /// Returns an error if no query language is enabled.
3038    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3039    pub fn execute_with_params(
3040        &self,
3041        _query: &str,
3042        _params: std::collections::HashMap<String, Value>,
3043    ) -> Result<QueryResult> {
3044        Err(grafeo_common::utils::error::Error::Internal(
3045            "No query language enabled".to_string(),
3046        ))
3047    }
3048
3049    /// Executes a GQL query.
3050    ///
3051    /// # Errors
3052    ///
3053    /// Returns an error if no query language is enabled.
3054    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3055    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3056        Err(grafeo_common::utils::error::Error::Internal(
3057            "No query language enabled".to_string(),
3058        ))
3059    }
3060
3061    /// Executes a Cypher query.
3062    ///
3063    /// # Errors
3064    ///
3065    /// Returns an error if the query fails to parse or execute.
3066    #[cfg(feature = "cypher")]
3067    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3068        use crate::query::{
3069            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3070            translators::cypher,
3071        };
3072
3073        // Handle schema DDL and SHOW commands before the normal query path
3074        let translation = cypher::translate_full(query)?;
3075        match translation {
3076            #[cfg(feature = "lpg")]
3077            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3078                use grafeo_common::utils::error::{
3079                    Error as GrafeoError, QueryError, QueryErrorKind,
3080                };
3081                self.require_permission(crate::auth::StatementKind::Admin)?;
3082                if *self.read_only_tx.lock() {
3083                    return Err(GrafeoError::Query(QueryError::new(
3084                        QueryErrorKind::Semantic,
3085                        "Cannot execute schema DDL in a read-only transaction",
3086                    )));
3087                }
3088                return self.execute_schema_command(cmd);
3089            }
3090            #[cfg(not(feature = "lpg"))]
3091            cypher::CypherTranslationResult::SchemaCommand(_) => {
3092                return Err(grafeo_common::utils::error::Error::Internal(
3093                    "Schema DDL requires the `lpg` feature".to_string(),
3094                ));
3095            }
3096            cypher::CypherTranslationResult::ShowIndexes => {
3097                return self.execute_show_indexes();
3098            }
3099            cypher::CypherTranslationResult::ShowConstraints => {
3100                return self.execute_show_constraints();
3101            }
3102            cypher::CypherTranslationResult::ShowCurrentGraphType => {
3103                return self.execute_show_current_graph_type();
3104            }
3105            cypher::CypherTranslationResult::Plan(_) => {
3106                // Fall through to normal execution below
3107            }
3108        }
3109
3110        #[cfg(not(target_arch = "wasm32"))]
3111        let start_time = std::time::Instant::now();
3112
3113        // Create cache key for this query
3114        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3115
3116        // Try to get cached optimized plan
3117        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3118            cached_plan
3119        } else {
3120            // Parse and translate the query to a logical plan
3121            let logical_plan = cypher::translate(query)?;
3122
3123            // Semantic validation
3124            let mut binder = Binder::new();
3125            let _binding_context = binder.bind(&logical_plan)?;
3126
3127            // Optimize the plan
3128            let active = self.active_store();
3129            let optimizer = Optimizer::from_graph_store(&*active);
3130            let plan = optimizer.optimize(logical_plan)?;
3131
3132            // Cache the optimized plan
3133            self.query_cache.put_optimized(cache_key, plan.clone());
3134
3135            plan
3136        };
3137
3138        // Check role-based permission for mutations
3139        if optimized_plan.root.has_mutations() {
3140            self.require_permission(crate::auth::StatementKind::Write)?;
3141        }
3142
3143        // Resolve the active store for query execution
3144        let active = self.active_store();
3145
3146        // EXPLAIN
3147        if optimized_plan.explain {
3148            use crate::query::processor::{annotate_pushdown_hints, explain_result};
3149            let mut plan = optimized_plan;
3150            annotate_pushdown_hints(&mut plan.root, active.as_ref());
3151            return Ok(explain_result(&plan));
3152        }
3153
3154        // PROFILE
3155        if optimized_plan.profile {
3156            let has_mutations = optimized_plan.root.has_mutations();
3157            return self.with_auto_commit(has_mutations, || {
3158                let (viewing_epoch, transaction_id) = self.get_transaction_context();
3159                let planner = self.create_planner_for_store(
3160                    Arc::clone(&active),
3161                    viewing_epoch,
3162                    transaction_id,
3163                );
3164                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3165
3166                let executor = self.make_executor(physical_plan.columns.clone());
3167                let _result = executor.execute(physical_plan.operator.as_mut())?;
3168
3169                let total_time_ms;
3170                #[cfg(not(target_arch = "wasm32"))]
3171                {
3172                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3173                }
3174                #[cfg(target_arch = "wasm32")]
3175                {
3176                    total_time_ms = 0.0;
3177                }
3178
3179                let profile_tree = crate::query::profile::build_profile_tree(
3180                    &optimized_plan.root,
3181                    &mut entries.into_iter(),
3182                );
3183                Ok(crate::query::profile::profile_result(
3184                    &profile_tree,
3185                    total_time_ms,
3186                ))
3187            });
3188        }
3189
3190        let has_mutations = optimized_plan.root.has_mutations();
3191
3192        let result = self.with_auto_commit(has_mutations, || {
3193            // Get transaction context for MVCC visibility
3194            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3195
3196            // Convert to physical plan with transaction context
3197            let planner =
3198                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3199            let mut physical_plan = planner.plan(&optimized_plan)?;
3200
3201            // Execute the plan
3202            let executor = self.make_executor(physical_plan.columns.clone());
3203            executor.execute(physical_plan.operator.as_mut())
3204        });
3205
3206        #[cfg(feature = "metrics")]
3207        {
3208            #[cfg(not(target_arch = "wasm32"))]
3209            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3210            #[cfg(target_arch = "wasm32")]
3211            let elapsed_ms = None;
3212            self.record_query_metrics("cypher", elapsed_ms, &result);
3213        }
3214
3215        result
3216    }
3217
3218    /// Executes a Gremlin query.
3219    ///
3220    /// # Errors
3221    ///
3222    /// Returns an error if the query fails to parse or execute.
3223    ///
3224    /// # Examples
3225    ///
3226    /// ```no_run
3227    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3228    /// use grafeo_engine::GrafeoDB;
3229    ///
3230    /// let db = GrafeoDB::new_in_memory();
3231    /// let session = db.session();
3232    ///
3233    /// // Create some nodes first
3234    /// session.create_node(&["Person"]);
3235    ///
3236    /// // Query using Gremlin
3237    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
3238    /// # Ok(())
3239    /// # }
3240    /// ```
3241    #[cfg(feature = "gremlin")]
3242    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3243        use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3244
3245        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3246        let start_time = Instant::now();
3247
3248        // Parse and translate the query to a logical plan
3249        let logical_plan = gremlin::translate(query)?;
3250
3251        // Semantic validation
3252        let mut binder = Binder::new();
3253        let _binding_context = binder.bind(&logical_plan)?;
3254
3255        // Optimize the plan
3256        let active = self.active_store();
3257        let optimizer = Optimizer::from_graph_store(&*active);
3258        let optimized_plan = optimizer.optimize(logical_plan)?;
3259
3260        let has_mutations = optimized_plan.root.has_mutations();
3261        if has_mutations {
3262            self.require_permission(crate::auth::StatementKind::Write)?;
3263        }
3264
3265        let result = self.with_auto_commit(has_mutations, || {
3266            // Get transaction context for MVCC visibility
3267            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3268
3269            // Convert to physical plan with transaction context
3270            let planner =
3271                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3272            let mut physical_plan = planner.plan(&optimized_plan)?;
3273
3274            // Execute the plan
3275            let executor = self.make_executor(physical_plan.columns.clone());
3276            executor.execute(physical_plan.operator.as_mut())
3277        });
3278
3279        #[cfg(feature = "metrics")]
3280        {
3281            #[cfg(not(target_arch = "wasm32"))]
3282            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3283            #[cfg(target_arch = "wasm32")]
3284            let elapsed_ms = None;
3285            self.record_query_metrics("gremlin", elapsed_ms, &result);
3286        }
3287
3288        result
3289    }
3290
3291    /// Executes a Gremlin query with parameters.
3292    ///
3293    /// # Errors
3294    ///
3295    /// Returns an error if the query fails to parse or execute.
3296    #[cfg(feature = "gremlin")]
3297    pub fn execute_gremlin_with_params(
3298        &self,
3299        query: &str,
3300        params: std::collections::HashMap<String, Value>,
3301    ) -> Result<QueryResult> {
3302        use crate::query::{
3303            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3304            translators::gremlin,
3305        };
3306
3307        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3308        let start_time = Instant::now();
3309
3310        // Parse and translate the query to a logical plan
3311        let mut logical_plan = gremlin::translate(query)?;
3312
3313        // Substitute parameters
3314        substitute_params(&mut logical_plan, &params)?;
3315
3316        // Semantic validation
3317        let mut binder = Binder::new();
3318        let _binding_context = binder.bind(&logical_plan)?;
3319
3320        // Optimize the plan
3321        let active = self.active_store();
3322        let optimizer = Optimizer::from_graph_store(&*active);
3323        let optimized_plan = optimizer.optimize(logical_plan)?;
3324
3325        let has_mutations = optimized_plan.root.has_mutations();
3326        if has_mutations {
3327            self.require_permission(crate::auth::StatementKind::Write)?;
3328        }
3329
3330        let result = self.with_auto_commit(has_mutations, || {
3331            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3332            let planner =
3333                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3334            let mut physical_plan = planner.plan(&optimized_plan)?;
3335            let executor = self.make_executor(physical_plan.columns.clone());
3336            executor.execute(physical_plan.operator.as_mut())
3337        });
3338
3339        #[cfg(feature = "metrics")]
3340        {
3341            #[cfg(not(target_arch = "wasm32"))]
3342            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3343            #[cfg(target_arch = "wasm32")]
3344            let elapsed_ms = None;
3345            self.record_query_metrics("gremlin", elapsed_ms, &result);
3346        }
3347
3348        result
3349    }
3350
3351    /// Executes a GraphQL query against the LPG store.
3352    ///
3353    /// # Errors
3354    ///
3355    /// Returns an error if the query fails to parse or execute.
3356    ///
3357    /// # Examples
3358    ///
3359    /// ```no_run
3360    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3361    /// use grafeo_engine::GrafeoDB;
3362    ///
3363    /// let db = GrafeoDB::new_in_memory();
3364    /// let session = db.session();
3365    ///
3366    /// // Create some nodes first
3367    /// session.create_node(&["User"]);
3368    ///
3369    /// // Query using GraphQL
3370    /// let result = session.execute_graphql("query { user { id name } }")?;
3371    /// # Ok(())
3372    /// # }
3373    /// ```
3374    #[cfg(feature = "graphql")]
3375    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3376        use crate::query::{
3377            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3378            translators::graphql,
3379        };
3380
3381        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3382        let start_time = Instant::now();
3383
3384        let mut logical_plan = graphql::translate(query)?;
3385
3386        // Substitute default parameter values from variable declarations
3387        if !logical_plan.default_params.is_empty() {
3388            let defaults = logical_plan.default_params.clone();
3389            substitute_params(&mut logical_plan, &defaults)?;
3390        }
3391
3392        let mut binder = Binder::new();
3393        let _binding_context = binder.bind(&logical_plan)?;
3394
3395        let active = self.active_store();
3396        let optimizer = Optimizer::from_graph_store(&*active);
3397        let optimized_plan = optimizer.optimize(logical_plan)?;
3398        let has_mutations = optimized_plan.root.has_mutations();
3399        if has_mutations {
3400            self.require_permission(crate::auth::StatementKind::Write)?;
3401        }
3402
3403        let result = self.with_auto_commit(has_mutations, || {
3404            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3405            let planner =
3406                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3407            let mut physical_plan = planner.plan(&optimized_plan)?;
3408            let executor = self.make_executor(physical_plan.columns.clone());
3409            executor.execute(physical_plan.operator.as_mut())
3410        });
3411
3412        #[cfg(feature = "metrics")]
3413        {
3414            #[cfg(not(target_arch = "wasm32"))]
3415            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3416            #[cfg(target_arch = "wasm32")]
3417            let elapsed_ms = None;
3418            self.record_query_metrics("graphql", elapsed_ms, &result);
3419        }
3420
3421        result
3422    }
3423
3424    /// Executes a GraphQL query with parameters.
3425    ///
3426    /// # Errors
3427    ///
3428    /// Returns an error if the query fails to parse or execute.
3429    #[cfg(feature = "graphql")]
3430    pub fn execute_graphql_with_params(
3431        &self,
3432        query: &str,
3433        params: std::collections::HashMap<String, Value>,
3434    ) -> Result<QueryResult> {
3435        use crate::query::{
3436            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3437            translators::graphql,
3438        };
3439
3440        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3441        let start_time = Instant::now();
3442
3443        // Parse and translate the query to a logical plan
3444        let mut logical_plan = graphql::translate(query)?;
3445
3446        // Merge default params with caller-supplied params
3447        if !logical_plan.default_params.is_empty() {
3448            let mut merged = logical_plan.default_params.clone();
3449            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3450            substitute_params(&mut logical_plan, &merged)?;
3451        } else {
3452            substitute_params(&mut logical_plan, &params)?;
3453        }
3454
3455        // Semantic validation
3456        let mut binder = Binder::new();
3457        let _binding_context = binder.bind(&logical_plan)?;
3458
3459        // Optimize the plan
3460        let active = self.active_store();
3461        let optimizer = Optimizer::from_graph_store(&*active);
3462        let optimized_plan = optimizer.optimize(logical_plan)?;
3463
3464        let has_mutations = optimized_plan.root.has_mutations();
3465        if has_mutations {
3466            self.require_permission(crate::auth::StatementKind::Write)?;
3467        }
3468
3469        let result = self.with_auto_commit(has_mutations, || {
3470            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3471            let planner =
3472                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3473            let mut physical_plan = planner.plan(&optimized_plan)?;
3474            let executor = self.make_executor(physical_plan.columns.clone());
3475            executor.execute(physical_plan.operator.as_mut())
3476        });
3477
3478        #[cfg(feature = "metrics")]
3479        {
3480            #[cfg(not(target_arch = "wasm32"))]
3481            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3482            #[cfg(target_arch = "wasm32")]
3483            let elapsed_ms = None;
3484            self.record_query_metrics("graphql", elapsed_ms, &result);
3485        }
3486
3487        result
3488    }
3489
3490    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3491    ///
3492    /// # Errors
3493    ///
3494    /// Returns an error if the query fails to parse or execute.
3495    ///
3496    /// # Examples
3497    ///
3498    /// ```no_run
3499    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3500    /// use grafeo_engine::GrafeoDB;
3501    ///
3502    /// let db = GrafeoDB::new_in_memory();
3503    /// let session = db.session();
3504    ///
3505    /// let result = session.execute_sql(
3506    ///     "SELECT * FROM GRAPH_TABLE (
3507    ///         MATCH (n:Person)
3508    ///         COLUMNS (n.name AS name)
3509    ///     )"
3510    /// )?;
3511    /// # Ok(())
3512    /// # }
3513    /// ```
3514    #[cfg(feature = "sql-pgq")]
3515    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3516        use crate::query::{
3517            binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3518            processor::QueryLanguage, translators::sql_pgq,
3519        };
3520
3521        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3522        let start_time = Instant::now();
3523
3524        // Parse and translate (always needed to check for DDL)
3525        let logical_plan = sql_pgq::translate(query)?;
3526
3527        // Handle DDL statements directly (they don't go through the query pipeline)
3528        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3529            self.require_permission(crate::auth::StatementKind::Admin)?;
3530            return Ok(QueryResult {
3531                columns: vec!["status".into()],
3532                column_types: vec![grafeo_common::types::LogicalType::String],
3533                rows: vec![vec![Value::from(format!(
3534                    "Property graph '{}' created ({} node tables, {} edge tables)",
3535                    cpg.name,
3536                    cpg.node_tables.len(),
3537                    cpg.edge_tables.len()
3538                ))]],
3539                execution_time_ms: None,
3540                rows_scanned: None,
3541                status_message: None,
3542                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3543            });
3544        }
3545
3546        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3547
3548        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3549            cached_plan
3550        } else {
3551            let mut binder = Binder::new();
3552            let _binding_context = binder.bind(&logical_plan)?;
3553            let active = self.active_store();
3554            let optimizer = Optimizer::from_graph_store(&*active);
3555            let plan = optimizer.optimize(logical_plan)?;
3556            self.query_cache.put_optimized(cache_key, plan.clone());
3557            plan
3558        };
3559
3560        let active = self.active_store();
3561        let has_mutations = optimized_plan.root.has_mutations();
3562        if has_mutations {
3563            self.require_permission(crate::auth::StatementKind::Write)?;
3564        }
3565
3566        let result = self.with_auto_commit(has_mutations, || {
3567            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3568            let planner =
3569                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3570            let mut physical_plan = planner.plan(&optimized_plan)?;
3571            let executor = self.make_executor(physical_plan.columns.clone());
3572            executor.execute(physical_plan.operator.as_mut())
3573        });
3574
3575        #[cfg(feature = "metrics")]
3576        {
3577            #[cfg(not(target_arch = "wasm32"))]
3578            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3579            #[cfg(target_arch = "wasm32")]
3580            let elapsed_ms = None;
3581            self.record_query_metrics("sql", elapsed_ms, &result);
3582        }
3583
3584        result
3585    }
3586
3587    /// Executes a SQL/PGQ query with parameters.
3588    ///
3589    /// # Errors
3590    ///
3591    /// Returns an error if the query fails to parse or execute.
3592    #[cfg(feature = "sql-pgq")]
3593    pub fn execute_sql_with_params(
3594        &self,
3595        query: &str,
3596        params: std::collections::HashMap<String, Value>,
3597    ) -> Result<QueryResult> {
3598        use crate::query::processor::{QueryLanguage, QueryProcessor};
3599
3600        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3601        let start_time = Instant::now();
3602
3603        let has_mutations = if self.identity.can_write() {
3604            Self::query_looks_like_mutation(query)
3605        } else {
3606            use crate::query::translators::sql_pgq;
3607            match sql_pgq::translate(query) {
3608                Ok(plan) if plan.root.has_mutations() => {
3609                    self.require_permission(crate::auth::StatementKind::Write)?;
3610                    true
3611                }
3612                Ok(_) => false,
3613                Err(_) => Self::query_looks_like_mutation(query),
3614            }
3615        };
3616        if has_mutations {
3617            self.require_permission(crate::auth::StatementKind::Write)?;
3618        }
3619        let active = self.active_store();
3620
3621        let result = self.with_auto_commit(has_mutations, || {
3622            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3623            let processor = QueryProcessor::for_stores_with_transaction(
3624                Arc::clone(&active),
3625                self.active_write_store(),
3626                Arc::clone(&self.transaction_manager),
3627            )?;
3628            let processor = if let Some(transaction_id) = transaction_id {
3629                processor.with_transaction_context(viewing_epoch, transaction_id)
3630            } else {
3631                processor
3632            };
3633            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3634        });
3635
3636        #[cfg(feature = "metrics")]
3637        {
3638            #[cfg(not(target_arch = "wasm32"))]
3639            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3640            #[cfg(target_arch = "wasm32")]
3641            let elapsed_ms = None;
3642            self.record_query_metrics("sql", elapsed_ms, &result);
3643        }
3644
3645        result
3646    }
3647
3648    /// Executes a query in the specified language by name.
3649    ///
3650    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3651    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3652    ///
3653    /// # Errors
3654    ///
3655    /// Returns an error if the language is unknown/disabled or the query fails.
3656    pub fn execute_language(
3657        &self,
3658        query: &str,
3659        language: &str,
3660        params: Option<std::collections::HashMap<String, Value>>,
3661    ) -> Result<QueryResult> {
3662        let _span = grafeo_info_span!(
3663            "grafeo::session::execute",
3664            language,
3665            query_len = query.len(),
3666        );
3667        match language {
3668            "gql" => {
3669                if let Some(p) = params {
3670                    self.execute_with_params(query, p)
3671                } else {
3672                    self.execute(query)
3673                }
3674            }
3675            #[cfg(feature = "cypher")]
3676            "cypher" => {
3677                if let Some(p) = params {
3678                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3679
3680                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3681                    let start_time = Instant::now();
3682
3683                    let has_mutations = if self.identity.can_write() {
3684                        Self::query_looks_like_mutation(query)
3685                    } else {
3686                        use crate::query::translators::cypher;
3687                        match cypher::translate(query) {
3688                            Ok(plan) if plan.root.has_mutations() => {
3689                                self.require_permission(crate::auth::StatementKind::Write)?;
3690                                true
3691                            }
3692                            Ok(_) => false,
3693                            Err(_) => Self::query_looks_like_mutation(query),
3694                        }
3695                    };
3696                    let active = self.active_store();
3697                    let result = self.with_auto_commit(has_mutations, || {
3698                        let processor = QueryProcessor::for_stores_with_transaction(
3699                            Arc::clone(&active),
3700                            self.active_write_store(),
3701                            Arc::clone(&self.transaction_manager),
3702                        )?;
3703                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3704                        let processor = if let Some(transaction_id) = transaction_id {
3705                            processor.with_transaction_context(viewing_epoch, transaction_id)
3706                        } else {
3707                            processor
3708                        };
3709                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3710                    });
3711
3712                    #[cfg(feature = "metrics")]
3713                    {
3714                        #[cfg(not(target_arch = "wasm32"))]
3715                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3716                        #[cfg(target_arch = "wasm32")]
3717                        let elapsed_ms = None;
3718                        self.record_query_metrics("cypher", elapsed_ms, &result);
3719                    }
3720
3721                    result
3722                } else {
3723                    self.execute_cypher(query)
3724                }
3725            }
3726            #[cfg(feature = "gremlin")]
3727            "gremlin" => {
3728                if let Some(p) = params {
3729                    self.execute_gremlin_with_params(query, p)
3730                } else {
3731                    self.execute_gremlin(query)
3732                }
3733            }
3734            #[cfg(feature = "graphql")]
3735            "graphql" => {
3736                if let Some(p) = params {
3737                    self.execute_graphql_with_params(query, p)
3738                } else {
3739                    self.execute_graphql(query)
3740                }
3741            }
3742            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3743            "graphql-rdf" => {
3744                if let Some(p) = params {
3745                    self.execute_graphql_rdf_with_params(query, p)
3746                } else {
3747                    self.execute_graphql_rdf(query)
3748                }
3749            }
3750            #[cfg(feature = "sql-pgq")]
3751            "sql" | "sql-pgq" => {
3752                if let Some(p) = params {
3753                    self.execute_sql_with_params(query, p)
3754                } else {
3755                    self.execute_sql(query)
3756                }
3757            }
3758            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3759            "sparql" => {
3760                if let Some(p) = params {
3761                    self.execute_sparql_with_params(query, p)
3762                } else {
3763                    self.execute_sparql(query)
3764                }
3765            }
3766            other => Err(grafeo_common::utils::error::Error::Query(
3767                grafeo_common::utils::error::QueryError::new(
3768                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3769                    format!("Unknown query language: '{other}'"),
3770                ),
3771            )),
3772        }
3773    }
3774
3775    /// Begins a new transaction.
3776    ///
3777    /// # Errors
3778    ///
3779    /// Returns an error if a transaction is already active.
3780    ///
3781    /// # Examples
3782    ///
3783    /// ```no_run
3784    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3785    /// use grafeo_engine::GrafeoDB;
3786    ///
3787    /// let db = GrafeoDB::new_in_memory();
3788    /// let mut session = db.session();
3789    ///
3790    /// session.begin_transaction()?;
3791    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3792    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3793    /// session.commit()?; // Both inserts committed atomically
3794    /// # Ok(())
3795    /// # }
3796    /// ```
3797    /// Clears all cached query plans.
3798    ///
3799    /// The plan cache is shared across all sessions on the same database,
3800    /// so clearing from one session affects all sessions.
3801    pub fn clear_plan_cache(&self) {
3802        self.query_cache.clear();
3803    }
3804
3805    /// Begins a new transaction on this session.
3806    ///
3807    /// Uses the default isolation level (`SnapshotIsolation`).
3808    ///
3809    /// # Errors
3810    ///
3811    /// Returns an error if a transaction is already active.
3812    #[cfg(feature = "lpg")]
3813    pub fn begin_transaction(&mut self) -> Result<()> {
3814        self.begin_transaction_inner(false, None)
3815    }
3816
3817    /// Begins a transaction with a specific isolation level.
3818    ///
3819    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3820    ///
3821    /// # Errors
3822    ///
3823    /// Returns an error if a transaction is already active.
3824    #[cfg(feature = "lpg")]
3825    pub fn begin_transaction_with_isolation(
3826        &mut self,
3827        isolation_level: crate::transaction::IsolationLevel,
3828    ) -> Result<()> {
3829        self.begin_transaction_inner(false, Some(isolation_level))
3830    }
3831
3832    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3833    #[cfg(feature = "lpg")]
3834    fn begin_transaction_inner(
3835        &self,
3836        read_only: bool,
3837        isolation_level: Option<crate::transaction::IsolationLevel>,
3838    ) -> Result<()> {
3839        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3840        let mut current = self.current_transaction.lock();
3841        if current.is_some() {
3842            // Nested transaction: create an auto-savepoint instead of a new tx.
3843            drop(current);
3844            let mut depth = self.transaction_nesting_depth.lock();
3845            *depth += 1;
3846            let sp_name = format!("_nested_tx_{}", *depth);
3847            self.savepoint(&sp_name)?;
3848            return Ok(());
3849        }
3850
3851        let active = self.active_lpg_store();
3852        self.transaction_start_node_count
3853            .store(active.node_count(), Ordering::Relaxed);
3854        self.transaction_start_edge_count
3855            .store(active.edge_count(), Ordering::Relaxed);
3856        let transaction_id = if let Some(level) = isolation_level {
3857            self.transaction_manager.begin_with_isolation(level)
3858        } else {
3859            self.transaction_manager.begin()
3860        };
3861        *current = Some(transaction_id);
3862        *self.read_only_tx.lock() = read_only || self.db_read_only;
3863
3864        // Record the initial graph as "touched" for cross-graph atomicity.
3865        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3866        let key = self.active_graph_storage_key();
3867        let mut touched = self.touched_graphs.lock();
3868        touched.clear();
3869        touched.push(key);
3870
3871        #[cfg(feature = "metrics")]
3872        {
3873            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3874            #[cfg(not(target_arch = "wasm32"))]
3875            {
3876                *self.tx_start_time.lock() = Some(Instant::now());
3877            }
3878        }
3879
3880        Ok(())
3881    }
3882
3883    /// Commits the current transaction.
3884    ///
3885    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3886    ///
3887    /// # Errors
3888    ///
3889    /// Returns an error if no transaction is active.
3890    #[cfg(feature = "lpg")]
3891    pub fn commit(&mut self) -> Result<()> {
3892        self.commit_inner()
3893    }
3894
3895    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3896    #[cfg(feature = "lpg")]
3897    fn commit_inner(&self) -> Result<()> {
3898        let _span = grafeo_debug_span!("grafeo::tx::commit");
3899
3900        #[cfg(feature = "testing-statement-injection")]
3901        if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3902            // Commit fails before any state is finalized. Treat it like any
3903            // other pre-prepare commit failure and auto-rollback so the
3904            // session returns to a clean, consistent state (matches real
3905            // DB semantics: commit failure implies the transaction is
3906            // aborted, not left in-flight).
3907            let _ = self.rollback_inner();
3908            return Err(grafeo_common::utils::error::Error::Internal(format!(
3909                "injected commit failure: {e}"
3910            )));
3911        }
3912
3913        self.check_no_active_streams("commit")?;
3914        // Nested transaction: release the auto-savepoint (changes are preserved).
3915        {
3916            let mut depth = self.transaction_nesting_depth.lock();
3917            if *depth > 0 {
3918                let sp_name = format!("_nested_tx_{depth}");
3919                *depth -= 1;
3920                drop(depth);
3921                return self.release_savepoint(&sp_name);
3922            }
3923        }
3924
3925        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3926            grafeo_common::utils::error::Error::Transaction(
3927                grafeo_common::utils::error::TransactionError::InvalidState(
3928                    "No active transaction".to_string(),
3929                ),
3930            )
3931        })?;
3932
3933        // Validate the transaction first (conflict detection) before committing data.
3934        // If this fails, we rollback the data changes instead of making them permanent.
3935        //
3936        // Take ownership of the touched graphs in one lock acquisition. Since
3937        // current_transaction was .take()'d above, no concurrent thread can call
3938        // track_graph_touch() for this transaction (it checks current_transaction
3939        // first), so this is safe.
3940        let touched = std::mem::take(&mut *self.touched_graphs.lock());
3941        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3942            Ok(epoch) => epoch,
3943            Err(e) => {
3944                // Conflict detected: rollback the data changes
3945                for graph_name in &touched {
3946                    let store = self.resolve_store(graph_name);
3947                    store.rollback_transaction_properties(transaction_id);
3948                }
3949                #[cfg(feature = "triple-store")]
3950                self.rollback_rdf_transaction(transaction_id);
3951                // Discard buffered CDC events on conflict rollback
3952                #[cfg(feature = "cdc")]
3953                if let Some(ref pending) = self.cdc_pending_events {
3954                    pending.lock().clear();
3955                }
3956                *self.read_only_tx.lock() = self.db_read_only;
3957                self.savepoints.lock().clear();
3958                self.touched_graphs.lock().clear();
3959                #[cfg(feature = "metrics")]
3960                {
3961                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3962                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3963                    #[cfg(not(target_arch = "wasm32"))]
3964                    if let Some(start) = self.tx_start_time.lock().take() {
3965                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3966                        crate::metrics::record_metric!(
3967                            self.metrics,
3968                            tx_duration,
3969                            observe duration_ms
3970                        );
3971                    }
3972                }
3973                return Err(e);
3974            }
3975        };
3976
3977        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3978        for graph_name in &touched {
3979            let store = self.resolve_store(graph_name);
3980            store.finalize_version_epochs(transaction_id, commit_epoch);
3981        }
3982
3983        // Commit succeeded: discard undo logs (make changes permanent)
3984        #[cfg(feature = "triple-store")]
3985        self.commit_rdf_transaction(transaction_id);
3986
3987        for graph_name in &touched {
3988            let store = self.resolve_store(graph_name);
3989            store.commit_transaction_properties(transaction_id);
3990        }
3991
3992        // Flush buffered CDC events now that the transaction is committed.
3993        // All buffered events have PENDING epoch; assign the real commit_epoch.
3994        // Uses record_batch to acquire the write lock once per commit.
3995        #[cfg(feature = "cdc")]
3996        if let Some(ref pending) = self.cdc_pending_events {
3997            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3998            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3999                e.epoch = commit_epoch;
4000                e
4001            }));
4002        }
4003
4004        // Log transaction commit and epoch advance to WAL so that crash
4005        // recovery can identify committed transactions and their epoch
4006        // boundaries. Without these markers, WAL recovery discards all
4007        // records as uncommitted (fixes #252 for the crash scenario).
4008        #[cfg(feature = "wal")]
4009        if let Some(ref wal) = self.wal {
4010            use grafeo_storage::wal::WalRecord;
4011            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4012                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4013            }
4014            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4015                epoch: commit_epoch,
4016            }) {
4017                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4018            }
4019        }
4020
4021        // Sync epoch for all touched graphs so that convenience lookups
4022        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
4023        let current_epoch = self.transaction_manager.current_epoch();
4024        for graph_name in &touched {
4025            let store = self.resolve_store(graph_name);
4026            store.sync_epoch(current_epoch);
4027        }
4028
4029        // Reset read-only flag and clear savepoints.
4030        // touched_graphs was already emptied by mem::take above.
4031        *self.read_only_tx.lock() = self.db_read_only;
4032        self.savepoints.lock().clear();
4033
4034        // Auto-GC: periodically prune old MVCC versions
4035        if self.gc_interval > 0 {
4036            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4037            if count.is_multiple_of(self.gc_interval) {
4038                #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4039                let gc_start = std::time::Instant::now();
4040
4041                let min_epoch = self.transaction_manager.min_active_epoch();
4042                for graph_name in &touched {
4043                    let store = self.resolve_store(graph_name);
4044                    store.gc_versions(min_epoch);
4045                }
4046                self.transaction_manager.gc();
4047
4048                #[cfg(feature = "metrics")]
4049                {
4050                    crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4051                    #[cfg(not(target_arch = "wasm32"))]
4052                    {
4053                        let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4054                        crate::metrics::record_metric!(
4055                            self.metrics,
4056                            gc_duration,
4057                            observe gc_duration_ms
4058                        );
4059                    }
4060                }
4061            }
4062        }
4063
4064        #[cfg(feature = "metrics")]
4065        {
4066            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4067            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4068            #[cfg(not(target_arch = "wasm32"))]
4069            if let Some(start) = self.tx_start_time.lock().take() {
4070                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4071                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4072            }
4073        }
4074
4075        Ok(())
4076    }
4077
4078    /// Aborts the current transaction.
4079    ///
4080    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
4081    ///
4082    /// # Errors
4083    ///
4084    /// Returns an error if no transaction is active.
4085    ///
4086    /// # Examples
4087    ///
4088    /// ```no_run
4089    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4090    /// use grafeo_engine::GrafeoDB;
4091    ///
4092    /// let db = GrafeoDB::new_in_memory();
4093    /// let mut session = db.session();
4094    ///
4095    /// session.begin_transaction()?;
4096    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4097    /// session.rollback()?; // Insert is discarded
4098    /// # Ok(())
4099    /// # }
4100    /// ```
4101    #[cfg(feature = "lpg")]
4102    pub fn rollback(&mut self) -> Result<()> {
4103        self.rollback_inner()
4104    }
4105
4106    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
4107    #[cfg(feature = "lpg")]
4108    fn rollback_inner(&self) -> Result<()> {
4109        let _span = grafeo_debug_span!("grafeo::tx::rollback");
4110        self.check_no_active_streams("rollback")?;
4111        // Nested transaction: rollback to the auto-savepoint.
4112        {
4113            let mut depth = self.transaction_nesting_depth.lock();
4114            if *depth > 0 {
4115                let sp_name = format!("_nested_tx_{depth}");
4116                *depth -= 1;
4117                drop(depth);
4118                return self.rollback_to_savepoint(&sp_name);
4119            }
4120        }
4121
4122        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4123            grafeo_common::utils::error::Error::Transaction(
4124                grafeo_common::utils::error::TransactionError::InvalidState(
4125                    "No active transaction".to_string(),
4126                ),
4127            )
4128        })?;
4129
4130        // Reset read-only flag
4131        *self.read_only_tx.lock() = self.db_read_only;
4132
4133        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
4134        let touched = self.touched_graphs.lock().clone();
4135        for graph_name in &touched {
4136            let store = self.resolve_store(graph_name);
4137            store.discard_uncommitted_versions(transaction_id);
4138        }
4139
4140        // Discard pending operations in the RDF store
4141        #[cfg(feature = "triple-store")]
4142        self.rollback_rdf_transaction(transaction_id);
4143
4144        // Discard buffered CDC events on rollback
4145        #[cfg(feature = "cdc")]
4146        if let Some(ref pending) = self.cdc_pending_events {
4147            pending.lock().clear();
4148        }
4149
4150        // Clear savepoints and touched graphs
4151        self.savepoints.lock().clear();
4152        self.touched_graphs.lock().clear();
4153
4154        // Mark transaction as aborted in the manager
4155        let result = self.transaction_manager.abort(transaction_id);
4156
4157        #[cfg(feature = "metrics")]
4158        if result.is_ok() {
4159            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4160            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4161            #[cfg(not(target_arch = "wasm32"))]
4162            if let Some(start) = self.tx_start_time.lock().take() {
4163                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4164                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4165            }
4166        }
4167
4168        result
4169    }
4170
4171    /// Creates a named savepoint within the current transaction.
4172    ///
4173    /// The savepoint captures the current node/edge ID counters so that
4174    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
4175    /// entities created after this point.
4176    ///
4177    /// # Errors
4178    ///
4179    /// Returns an error if no transaction is active.
4180    #[cfg(feature = "lpg")]
4181    pub fn savepoint(&self, name: &str) -> Result<()> {
4182        let tx_id = self.current_transaction.lock().ok_or_else(|| {
4183            grafeo_common::utils::error::Error::Transaction(
4184                grafeo_common::utils::error::TransactionError::InvalidState(
4185                    "No active transaction".to_string(),
4186                ),
4187            )
4188        })?;
4189
4190        // Capture state for every graph touched so far.
4191        let touched = self.touched_graphs.lock().clone();
4192        let graph_snapshots: Vec<GraphSavepoint> = touched
4193            .iter()
4194            .map(|graph_name| {
4195                let store = self.resolve_store(graph_name);
4196                GraphSavepoint {
4197                    graph_name: graph_name.clone(),
4198                    next_node_id: store.peek_next_node_id(),
4199                    next_edge_id: store.peek_next_edge_id(),
4200                    undo_log_position: store.property_undo_log_position(tx_id),
4201                }
4202            })
4203            .collect();
4204
4205        self.savepoints.lock().push(SavepointState {
4206            name: name.to_string(),
4207            graph_snapshots,
4208            active_graph: self.current_graph.lock().clone(),
4209            #[cfg(feature = "cdc")]
4210            cdc_event_position: self
4211                .cdc_pending_events
4212                .as_ref()
4213                .map_or(0, |p| p.lock().len()),
4214        });
4215        Ok(())
4216    }
4217
4218    /// Rolls back to a named savepoint, undoing all writes made after it.
4219    ///
4220    /// The savepoint and any savepoints created after it are removed.
4221    /// Entities with IDs >= the savepoint snapshot are discarded.
4222    ///
4223    /// # Errors
4224    ///
4225    /// Returns an error if no transaction is active or the savepoint does not exist.
4226    #[cfg(feature = "lpg")]
4227    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4228        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4229            grafeo_common::utils::error::Error::Transaction(
4230                grafeo_common::utils::error::TransactionError::InvalidState(
4231                    "No active transaction".to_string(),
4232                ),
4233            )
4234        })?;
4235
4236        let mut savepoints = self.savepoints.lock();
4237
4238        // Find the savepoint by name (search from the end for nested savepoints)
4239        let pos = savepoints
4240            .iter()
4241            .rposition(|sp| sp.name == name)
4242            .ok_or_else(|| {
4243                grafeo_common::utils::error::Error::Transaction(
4244                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4245                        "Savepoint '{name}' not found"
4246                    )),
4247                )
4248            })?;
4249
4250        let sp_state = savepoints[pos].clone();
4251
4252        // Remove this savepoint and all later ones
4253        savepoints.truncate(pos);
4254        drop(savepoints);
4255
4256        // Roll back each graph that was captured in the savepoint.
4257        for gs in &sp_state.graph_snapshots {
4258            let store = self.resolve_store(&gs.graph_name);
4259
4260            // Replay property/label undo entries recorded after the savepoint
4261            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4262
4263            // Discard entities created after the savepoint
4264            let current_next_node = store.peek_next_node_id();
4265            let current_next_edge = store.peek_next_edge_id();
4266
4267            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4268                .map(NodeId::new)
4269                .collect();
4270            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4271                .map(EdgeId::new)
4272                .collect();
4273
4274            if !node_ids.is_empty() || !edge_ids.is_empty() {
4275                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4276            }
4277        }
4278
4279        // Also roll back any graphs that were touched AFTER the savepoint
4280        // but not captured in it. These need full discard since the savepoint
4281        // didn't include them.
4282        let touched = self.touched_graphs.lock().clone();
4283        for graph_name in &touched {
4284            let already_captured = sp_state
4285                .graph_snapshots
4286                .iter()
4287                .any(|gs| gs.graph_name == *graph_name);
4288            if !already_captured {
4289                let store = self.resolve_store(graph_name);
4290                store.discard_uncommitted_versions(transaction_id);
4291            }
4292        }
4293
4294        // Truncate CDC event buffer to the savepoint position.
4295        #[cfg(feature = "cdc")]
4296        if let Some(ref pending) = self.cdc_pending_events {
4297            pending.lock().truncate(sp_state.cdc_event_position);
4298        }
4299
4300        // Restore touched_graphs to only the graphs that were known at savepoint time.
4301        let mut touched = self.touched_graphs.lock();
4302        touched.clear();
4303        for gs in &sp_state.graph_snapshots {
4304            if !touched.contains(&gs.graph_name) {
4305                touched.push(gs.graph_name.clone());
4306            }
4307        }
4308
4309        Ok(())
4310    }
4311
4312    /// Releases (removes) a named savepoint without rolling back.
4313    ///
4314    /// # Errors
4315    ///
4316    /// Returns an error if no transaction is active or the savepoint does not exist.
4317    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4318        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4319            grafeo_common::utils::error::Error::Transaction(
4320                grafeo_common::utils::error::TransactionError::InvalidState(
4321                    "No active transaction".to_string(),
4322                ),
4323            )
4324        })?;
4325
4326        let mut savepoints = self.savepoints.lock();
4327        let pos = savepoints
4328            .iter()
4329            .rposition(|sp| sp.name == name)
4330            .ok_or_else(|| {
4331                grafeo_common::utils::error::Error::Transaction(
4332                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4333                        "Savepoint '{name}' not found"
4334                    )),
4335                )
4336            })?;
4337        savepoints.remove(pos);
4338        Ok(())
4339    }
4340
4341    /// Returns whether a transaction is active.
4342    #[must_use]
4343    pub fn in_transaction(&self) -> bool {
4344        self.current_transaction.lock().is_some()
4345    }
4346
4347    /// Returns the current transaction ID, if any.
4348    #[must_use]
4349    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4350        *self.current_transaction.lock()
4351    }
4352
4353    /// Returns a reference to the transaction manager.
4354    #[must_use]
4355    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4356        &self.transaction_manager
4357    }
4358
4359    /// Returns the store's current node count and the count at transaction start.
4360    #[cfg(feature = "lpg")]
4361    #[must_use]
4362    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4363        (
4364            self.transaction_start_node_count.load(Ordering::Relaxed),
4365            self.active_lpg_store().node_count(),
4366        )
4367    }
4368
4369    /// Returns the store's current edge count and the count at transaction start.
4370    #[cfg(feature = "lpg")]
4371    #[must_use]
4372    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4373        (
4374            self.transaction_start_edge_count.load(Ordering::Relaxed),
4375            self.active_lpg_store().edge_count(),
4376        )
4377    }
4378
4379    /// Prepares the current transaction for a two-phase commit.
4380    ///
4381    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4382    /// lets you inspect pending changes and attach metadata before finalizing.
4383    /// The mutable borrow prevents concurrent operations while the commit is
4384    /// pending.
4385    ///
4386    /// If the `PreparedCommit` is dropped without calling `commit()` or
4387    /// `abort()`, the transaction is automatically rolled back.
4388    ///
4389    /// # Errors
4390    ///
4391    /// Returns an error if no transaction is active.
4392    ///
4393    /// # Examples
4394    ///
4395    /// ```no_run
4396    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4397    /// use grafeo_engine::GrafeoDB;
4398    ///
4399    /// let db = GrafeoDB::new_in_memory();
4400    /// let mut session = db.session();
4401    ///
4402    /// session.begin_transaction()?;
4403    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4404    ///
4405    /// let mut prepared = session.prepare_commit()?;
4406    /// println!("Nodes written: {}", prepared.info().nodes_written);
4407    /// prepared.set_metadata("audit_user", "admin");
4408    /// prepared.commit()?;
4409    /// # Ok(())
4410    /// # }
4411    /// ```
4412    #[cfg(feature = "lpg")]
4413    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4414        crate::transaction::PreparedCommit::new(self)
4415    }
4416
4417    /// Sets auto-commit mode.
4418    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4419        self.auto_commit = auto_commit;
4420    }
4421
4422    /// Returns whether auto-commit is enabled.
4423    #[must_use]
4424    pub fn auto_commit(&self) -> bool {
4425        self.auto_commit
4426    }
4427
4428    /// Returns `true` if auto-commit should wrap this execution.
4429    ///
4430    /// Auto-commit kicks in when: the session is in auto-commit mode,
4431    /// no explicit transaction is active, and the query mutates data.
4432    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4433        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4434    }
4435
4436    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4437    /// returns `true`. On error the transaction is rolled back.
4438    #[cfg(feature = "lpg")]
4439    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4440    where
4441        F: FnOnce() -> Result<QueryResult>,
4442    {
4443        if self.needs_auto_commit(has_mutations) {
4444            self.begin_transaction_inner(false, None)?;
4445            match body() {
4446                Ok(result) => {
4447                    self.commit_inner()?;
4448                    Ok(result)
4449                }
4450                Err(e) => {
4451                    let _ = self.rollback_inner();
4452                    Err(e)
4453                }
4454            }
4455        } else {
4456            body()
4457        }
4458    }
4459
4460    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4461    #[cfg(not(feature = "lpg"))]
4462    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4463    where
4464        F: FnOnce() -> Result<QueryResult>,
4465    {
4466        body()
4467    }
4468
4469    /// Quick heuristic: returns `true` when the query text looks like it
4470    /// performs a mutation. Used by `_with_params` paths that go through the
4471    /// `QueryProcessor` (where the logical plan isn't available before
4472    /// execution). False negatives are harmless: the data just won't be
4473    /// auto-committed, which matches the prior behaviour.
4474    fn query_looks_like_mutation(query: &str) -> bool {
4475        let upper = query.to_ascii_uppercase();
4476        upper.contains("INSERT")
4477            || upper.contains("CREATE")
4478            || upper.contains("DELETE")
4479            || upper.contains("MERGE")
4480            || upper.contains("SET")
4481            || upper.contains("REMOVE")
4482            || upper.contains("DROP")
4483            || upper.contains("ALTER")
4484    }
4485
4486    /// Returns `Err(Transaction(InvalidState))` if any `ResultStream` is
4487    /// still outstanding for this session.
4488    #[cfg(feature = "lpg")]
4489    fn check_no_active_streams(&self, op: &str) -> Result<()> {
4490        if self.active_streams.load(Ordering::Acquire) > 0 {
4491            return Err(grafeo_common::utils::error::Error::Transaction(
4492                grafeo_common::utils::error::TransactionError::InvalidState(format!(
4493                    "Cannot {op} while streaming results are active; drop the stream first"
4494                )),
4495            ));
4496        }
4497        Ok(())
4498    }
4499
4500    /// Computes the wall-clock deadline for query execution.
4501    #[must_use]
4502    fn query_deadline(&self) -> Option<Instant> {
4503        #[cfg(not(target_arch = "wasm32"))]
4504        {
4505            self.query_timeout.map(|d| Instant::now() + d)
4506        }
4507        #[cfg(target_arch = "wasm32")]
4508        {
4509            let _ = &self.query_timeout;
4510            None
4511        }
4512    }
4513
4514    /// Creates an executor with deadline and timeout duration configured.
4515    fn make_executor(&self, columns: Vec<String>) -> Executor {
4516        Executor::with_columns(columns)
4517            .with_deadline(self.query_deadline())
4518            .with_timeout_duration(self.query_timeout)
4519    }
4520
4521    /// Creates a per-query `OperatorMemoryContext` for memory-aware spilling.
4522    ///
4523    /// Returns `None` if the buffer manager is not configured or has no spill path,
4524    /// in which case operators will use row-count fallback thresholds.
4525    #[cfg(feature = "spill")]
4526    fn make_operator_memory_context(
4527        &self,
4528    ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4529        let bm = self.buffer_manager.as_ref()?;
4530        let spill_path = bm.config().spill_path.as_ref()?;
4531        // Per-query isolation: create a unique subdirectory
4532        let query_id = self
4533            .commit_counter
4534            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4535        let query_dir = spill_path.join(format!("query_{query_id}"));
4536        let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4537        Some(grafeo_core::execution::OperatorMemoryContext::new(
4538            std::sync::Arc::clone(bm),
4539            sm,
4540        ))
4541    }
4542
4543    /// Checks that a property value does not exceed the configured size limit.
4544    fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4545        if let Some(limit) = self.max_property_size {
4546            let size = value.estimated_size_bytes();
4547            if size > limit {
4548                let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4549                    format!("{} MiB", limit / (1024 * 1024))
4550                } else if limit >= 1024 && limit % 1024 == 0 {
4551                    format!("{} KiB", limit / 1024)
4552                } else {
4553                    format!("{limit} bytes")
4554                };
4555                return Err(grafeo_common::utils::error::Error::Query(
4556                    grafeo_common::utils::error::QueryError::new(
4557                        grafeo_common::utils::error::QueryErrorKind::Execution,
4558                        format!(
4559                            "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4560                        ),
4561                    )
4562                    .with_hint(
4563                        "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4564                    ),
4565                ));
4566            }
4567        }
4568        Ok(())
4569    }
4570
4571    /// Records query metrics for any language.
4572    ///
4573    /// Called after query execution to update counters, latency histogram,
4574    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4575    /// where `Instant` is unavailable.
4576    #[cfg(feature = "metrics")]
4577    fn record_query_metrics(
4578        &self,
4579        language: &str,
4580        elapsed_ms: Option<f64>,
4581        result: &Result<crate::database::QueryResult>,
4582    ) {
4583        use crate::metrics::record_metric;
4584
4585        record_metric!(self.metrics, query_count, inc);
4586        if let Some(ref reg) = self.metrics {
4587            reg.query_count_by_language.increment(language);
4588        }
4589        if let Some(ms) = elapsed_ms {
4590            record_metric!(self.metrics, query_latency, observe ms);
4591        }
4592        match result {
4593            Ok(r) => {
4594                let returned = r.rows.len() as u64;
4595                record_metric!(self.metrics, rows_returned, add returned);
4596                if let Some(scanned) = r.rows_scanned {
4597                    record_metric!(self.metrics, rows_scanned, add scanned);
4598                }
4599            }
4600            Err(e) => {
4601                record_metric!(self.metrics, query_errors, inc);
4602                // Detect timeout errors
4603                let msg = e.to_string();
4604                if msg.contains("exceeded timeout") {
4605                    record_metric!(self.metrics, query_timeouts, inc);
4606                }
4607            }
4608        }
4609    }
4610
4611    /// Evaluates a simple integer literal from a session parameter expression.
4612    #[cfg(feature = "gql")]
4613    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4614        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4615        match expr {
4616            Expression::Literal(Literal::Integer(n)) => Some(*n),
4617            _ => None,
4618        }
4619    }
4620
4621    /// Returns the current transaction context for MVCC visibility.
4622    ///
4623    /// Returns `(viewing_epoch, transaction_id)` where:
4624    /// - `viewing_epoch` is the epoch at which to check version visibility
4625    /// - `transaction_id` is the current transaction ID (if in a transaction)
4626    #[must_use]
4627    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4628        // Time-travel override takes precedence (read-only, no tx context)
4629        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4630            return (epoch, None);
4631        }
4632
4633        if let Some(transaction_id) = *self.current_transaction.lock() {
4634            // In a transaction: use the transaction's start epoch
4635            let epoch = self
4636                .transaction_manager
4637                .start_epoch(transaction_id)
4638                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4639            (epoch, Some(transaction_id))
4640        } else {
4641            // No transaction: use current epoch
4642            (self.transaction_manager.current_epoch(), None)
4643        }
4644    }
4645
4646    /// Creates a planner with transaction context and constraint validator.
4647    ///
4648    /// The `store` parameter is the graph store to plan against (use
4649    /// `self.active_store()` for graph-aware execution).
4650    fn create_planner_for_store(
4651        &self,
4652        store: Arc<dyn GraphStoreSearch>,
4653        viewing_epoch: EpochId,
4654        transaction_id: Option<TransactionId>,
4655    ) -> crate::query::Planner {
4656        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4657    }
4658
4659    fn create_planner_for_store_with_read_only(
4660        &self,
4661        store: Arc<dyn GraphStoreSearch>,
4662        viewing_epoch: EpochId,
4663        transaction_id: Option<TransactionId>,
4664        read_only: bool,
4665    ) -> crate::query::Planner {
4666        use crate::query::Planner;
4667        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4668
4669        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4670        let info_store = Arc::clone(&store);
4671        let schema_store = Arc::clone(&store);
4672
4673        let session_context = SessionContext {
4674            current_schema: self.current_schema(),
4675            current_graph: self.current_graph(),
4676            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4677            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4678        };
4679
4680        let write_store = self.active_write_store();
4681
4682        let mut planner = Planner::with_context(
4683            Arc::clone(&store),
4684            write_store,
4685            Arc::clone(&self.transaction_manager),
4686            transaction_id,
4687            viewing_epoch,
4688        )
4689        .with_factorized_execution(self.factorized_execution)
4690        .with_catalog(Arc::clone(&self.catalog))
4691        .with_session_context(session_context)
4692        .with_read_only(read_only);
4693
4694        // Attach the constraint validator for schema enforcement and property size limits
4695        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4696            .with_store(store)
4697            .with_max_property_size(self.max_property_size);
4698        planner = planner.with_validator(Arc::new(validator));
4699
4700        planner
4701    }
4702
4703    /// Builds a `Value::Map` for the `info()` introspection function.
4704    fn build_info_value(store: &dyn GraphStore) -> Value {
4705        use grafeo_common::types::PropertyKey;
4706        use std::collections::BTreeMap;
4707
4708        let mut map = BTreeMap::new();
4709        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4710        // reason: node/edge counts will not exceed i64::MAX
4711        #[allow(clippy::cast_possible_wrap)]
4712        let node_count = store.node_count() as i64;
4713        // reason: value is a small counter, well within i64::MAX
4714        #[allow(clippy::cast_possible_wrap)]
4715        let edge_count = store.edge_count() as i64;
4716        map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4717        map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4718        map.insert(
4719            PropertyKey::from("version"),
4720            Value::String(env!("CARGO_PKG_VERSION").into()),
4721        );
4722        Value::Map(map.into())
4723    }
4724
4725    /// Builds a `Value::Map` for the `schema()` introspection function.
4726    fn build_schema_value(store: &dyn GraphStore) -> Value {
4727        use grafeo_common::types::PropertyKey;
4728        use std::collections::BTreeMap;
4729
4730        let labels: Vec<Value> = store
4731            .all_labels()
4732            .into_iter()
4733            .map(|l| Value::String(l.into()))
4734            .collect();
4735        let edge_types: Vec<Value> = store
4736            .all_edge_types()
4737            .into_iter()
4738            .map(|t| Value::String(t.into()))
4739            .collect();
4740        let property_keys: Vec<Value> = store
4741            .all_property_keys()
4742            .into_iter()
4743            .map(|k| Value::String(k.into()))
4744            .collect();
4745
4746        let mut map = BTreeMap::new();
4747        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4748        map.insert(
4749            PropertyKey::from("edge_types"),
4750            Value::List(edge_types.into()),
4751        );
4752        map.insert(
4753            PropertyKey::from("property_keys"),
4754            Value::List(property_keys.into()),
4755        );
4756        Value::Map(map.into())
4757    }
4758
4759    /// Creates a node directly (bypassing query execution).
4760    ///
4761    /// This is a low-level API for testing and direct manipulation.
4762    /// If a transaction is active, the node will be versioned with the transaction ID.
4763    #[cfg(feature = "lpg")]
4764    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4765        let (epoch, transaction_id) = self.get_transaction_context();
4766        self.active_lpg_store().create_node_versioned(
4767            labels,
4768            epoch,
4769            transaction_id.unwrap_or(TransactionId::SYSTEM),
4770        )
4771    }
4772
4773    /// Creates a node with properties.
4774    ///
4775    /// If a transaction is active, the node will be versioned with the transaction ID.
4776    ///
4777    /// # Errors
4778    ///
4779    /// Returns an error if any property value exceeds the configured `max_property_size`.
4780    #[cfg(feature = "lpg")]
4781    pub fn create_node_with_props<'a>(
4782        &self,
4783        labels: &[&str],
4784        properties: impl IntoIterator<Item = (&'a str, Value)>,
4785    ) -> Result<NodeId> {
4786        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4787        for (key, value) in &props {
4788            self.check_property_size(key, value)?;
4789        }
4790        let (epoch, transaction_id) = self.get_transaction_context();
4791        Ok(self.active_lpg_store().create_node_with_props_versioned(
4792            labels,
4793            props,
4794            epoch,
4795            transaction_id.unwrap_or(TransactionId::SYSTEM),
4796        ))
4797    }
4798
4799    /// Creates an edge between two nodes.
4800    ///
4801    /// This is a low-level API for testing and direct manipulation.
4802    /// If a transaction is active, the edge will be versioned with the transaction ID.
4803    #[cfg(feature = "lpg")]
4804    pub fn create_edge(
4805        &self,
4806        src: NodeId,
4807        dst: NodeId,
4808        edge_type: &str,
4809    ) -> grafeo_common::types::EdgeId {
4810        let (epoch, transaction_id) = self.get_transaction_context();
4811        self.active_lpg_store().create_edge_versioned(
4812            src,
4813            dst,
4814            edge_type,
4815            epoch,
4816            transaction_id.unwrap_or(TransactionId::SYSTEM),
4817        )
4818    }
4819
4820    /// Creates an edge with properties within the active transaction context.
4821    ///
4822    /// # Errors
4823    ///
4824    /// Returns an error if any property value exceeds the configured `max_property_size`.
4825    #[cfg(feature = "lpg")]
4826    pub fn create_edge_with_props<'a>(
4827        &self,
4828        src: NodeId,
4829        dst: NodeId,
4830        edge_type: &str,
4831        properties: impl IntoIterator<Item = (&'a str, Value)>,
4832    ) -> Result<grafeo_common::types::EdgeId> {
4833        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4834        for (key, value) in &props {
4835            self.check_property_size(key, value)?;
4836        }
4837        let (epoch, transaction_id) = self.get_transaction_context();
4838        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4839        let store = self.active_lpg_store();
4840        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4841        for (key, value) in props {
4842            store.set_edge_property_versioned(eid, key, value, tid);
4843        }
4844        Ok(eid)
4845    }
4846
4847    /// Sets a node property within the active transaction context.
4848    ///
4849    /// # Errors
4850    ///
4851    /// Returns an error if the value exceeds the configured `max_property_size`.
4852    #[cfg(feature = "lpg")]
4853    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4854        self.check_property_size(key, &value)?;
4855        let (_, transaction_id) = self.get_transaction_context();
4856        if let Some(tid) = transaction_id {
4857            self.active_lpg_store()
4858                .set_node_property_versioned(id, key, value, tid);
4859        } else {
4860            self.active_lpg_store().set_node_property(id, key, value);
4861        }
4862        Ok(())
4863    }
4864
4865    /// Sets an edge property within the active transaction context.
4866    ///
4867    /// # Errors
4868    ///
4869    /// Returns an error if the value exceeds the configured `max_property_size`.
4870    #[cfg(feature = "lpg")]
4871    pub fn set_edge_property(
4872        &self,
4873        id: grafeo_common::types::EdgeId,
4874        key: &str,
4875        value: Value,
4876    ) -> Result<()> {
4877        self.check_property_size(key, &value)?;
4878        let (_, transaction_id) = self.get_transaction_context();
4879        if let Some(tid) = transaction_id {
4880            self.active_lpg_store()
4881                .set_edge_property_versioned(id, key, value, tid);
4882        } else {
4883            self.active_lpg_store().set_edge_property(id, key, value);
4884        }
4885        Ok(())
4886    }
4887
4888    /// Deletes a node within the active transaction context.
4889    #[cfg(feature = "lpg")]
4890    pub fn delete_node(&self, id: NodeId) -> bool {
4891        let (epoch, transaction_id) = self.get_transaction_context();
4892        if let Some(tid) = transaction_id {
4893            self.active_lpg_store()
4894                .delete_node_versioned(id, epoch, tid)
4895        } else {
4896            self.active_lpg_store().delete_node(id)
4897        }
4898    }
4899
4900    /// Deletes an edge within the active transaction context.
4901    #[cfg(feature = "lpg")]
4902    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4903        let (epoch, transaction_id) = self.get_transaction_context();
4904        if let Some(tid) = transaction_id {
4905            self.active_lpg_store()
4906                .delete_edge_versioned(id, epoch, tid)
4907        } else {
4908            self.active_lpg_store().delete_edge(id)
4909        }
4910    }
4911
4912    // =========================================================================
4913    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4914    // =========================================================================
4915
4916    /// Gets a node by ID directly, bypassing query planning.
4917    ///
4918    /// This is the fastest way to retrieve a single node when you know its ID.
4919    /// Skips parsing, binding, optimization, and physical planning entirely.
4920    ///
4921    /// # Performance
4922    ///
4923    /// - Time complexity: O(1) average case
4924    /// - No lock contention (uses DashMap internally)
4925    /// - ~20-30x faster than equivalent MATCH query
4926    ///
4927    /// # Example
4928    ///
4929    /// ```no_run
4930    /// # use grafeo_engine::GrafeoDB;
4931    /// # let db = GrafeoDB::new_in_memory();
4932    /// let session = db.session();
4933    /// let node_id = session.create_node(&["Person"]);
4934    ///
4935    /// // Direct lookup - O(1), no query planning
4936    /// let node = session.get_node(node_id);
4937    /// assert!(node.is_some());
4938    /// ```
4939    #[cfg(feature = "lpg")]
4940    #[must_use]
4941    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4942        let (epoch, transaction_id) = self.get_transaction_context();
4943        self.active_lpg_store().get_node_versioned(
4944            id,
4945            epoch,
4946            transaction_id.unwrap_or(TransactionId::SYSTEM),
4947        )
4948    }
4949
4950    /// Gets a single property from a node by ID, bypassing query planning.
4951    ///
4952    /// More efficient than `get_node()` when you only need one property,
4953    /// as it avoids loading the full node with all properties.
4954    ///
4955    /// # Performance
4956    ///
4957    /// - Time complexity: O(1) average case
4958    /// - No query planning overhead
4959    ///
4960    /// # Example
4961    ///
4962    /// ```no_run
4963    /// # use grafeo_engine::GrafeoDB;
4964    /// # use grafeo_common::types::Value;
4965    /// # let db = GrafeoDB::new_in_memory();
4966    /// let session = db.session();
4967    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]).unwrap();
4968    ///
4969    /// // Direct property access - O(1)
4970    /// let name = session.get_node_property(id, "name");
4971    /// assert_eq!(name, Some(Value::String("Alix".into())));
4972    /// ```
4973    #[cfg(feature = "lpg")]
4974    #[must_use]
4975    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4976        self.get_node(id)
4977            .and_then(|node| node.get_property(key).cloned())
4978    }
4979
4980    /// Gets an edge by ID directly, bypassing query planning.
4981    ///
4982    /// # Performance
4983    ///
4984    /// - Time complexity: O(1) average case
4985    /// - No lock contention
4986    #[cfg(feature = "lpg")]
4987    #[must_use]
4988    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4989        let (epoch, transaction_id) = self.get_transaction_context();
4990        self.active_lpg_store().get_edge_versioned(
4991            id,
4992            epoch,
4993            transaction_id.unwrap_or(TransactionId::SYSTEM),
4994        )
4995    }
4996
4997    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4998    ///
4999    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
5000    ///
5001    /// # Performance
5002    ///
5003    /// - Time complexity: O(degree) where degree is the number of outgoing edges
5004    /// - Uses adjacency index for direct access
5005    /// - ~10-20x faster than equivalent MATCH query
5006    ///
5007    /// # Example
5008    ///
5009    /// ```no_run
5010    /// # use grafeo_engine::GrafeoDB;
5011    /// # let db = GrafeoDB::new_in_memory();
5012    /// let session = db.session();
5013    /// let alix = session.create_node(&["Person"]);
5014    /// let gus = session.create_node(&["Person"]);
5015    /// session.create_edge(alix, gus, "KNOWS");
5016    ///
5017    /// // Direct neighbor lookup - O(degree)
5018    /// let neighbors = session.get_neighbors_outgoing(alix);
5019    /// assert_eq!(neighbors.len(), 1);
5020    /// assert_eq!(neighbors[0].0, gus);
5021    /// ```
5022    #[cfg(feature = "lpg")]
5023    #[must_use]
5024    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5025        self.active_lpg_store()
5026            .edges_from(node, Direction::Outgoing)
5027            .collect()
5028    }
5029
5030    /// Gets incoming neighbors of a node directly, bypassing query planning.
5031    ///
5032    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
5033    ///
5034    /// # Performance
5035    ///
5036    /// - Time complexity: O(degree) where degree is the number of incoming edges
5037    /// - Uses backward adjacency index for direct access
5038    #[cfg(feature = "lpg")]
5039    #[must_use]
5040    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5041        self.active_lpg_store()
5042            .edges_from(node, Direction::Incoming)
5043            .collect()
5044    }
5045
5046    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
5047    ///
5048    /// # Example
5049    ///
5050    /// ```no_run
5051    /// # use grafeo_engine::GrafeoDB;
5052    /// # let db = GrafeoDB::new_in_memory();
5053    /// # let session = db.session();
5054    /// # let alix = session.create_node(&["Person"]);
5055    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5056    /// ```
5057    #[cfg(feature = "lpg")]
5058    #[must_use]
5059    pub fn get_neighbors_outgoing_by_type(
5060        &self,
5061        node: NodeId,
5062        edge_type: &str,
5063    ) -> Vec<(NodeId, EdgeId)> {
5064        self.active_lpg_store()
5065            .edges_from(node, Direction::Outgoing)
5066            .filter(|(_, edge_id)| {
5067                self.get_edge(*edge_id)
5068                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
5069            })
5070            .collect()
5071    }
5072
5073    /// Checks if a node exists, bypassing query planning.
5074    ///
5075    /// # Performance
5076    ///
5077    /// - Time complexity: O(1)
5078    /// - Fastest existence check available
5079    #[cfg(feature = "lpg")]
5080    #[must_use]
5081    pub fn node_exists(&self, id: NodeId) -> bool {
5082        self.get_node(id).is_some()
5083    }
5084
5085    /// Checks if an edge exists, bypassing query planning.
5086    #[cfg(feature = "lpg")]
5087    #[must_use]
5088    pub fn edge_exists(&self, id: EdgeId) -> bool {
5089        self.get_edge(id).is_some()
5090    }
5091
5092    /// Gets the degree (number of edges) of a node.
5093    ///
5094    /// Returns (outgoing_degree, incoming_degree).
5095    #[cfg(feature = "lpg")]
5096    #[must_use]
5097    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5098        let active = self.active_lpg_store();
5099        let out = active.out_degree(node);
5100        let in_degree = active.in_degree(node);
5101        (out, in_degree)
5102    }
5103
5104    /// Batch lookup of multiple nodes by ID.
5105    ///
5106    /// More efficient than calling `get_node()` in a loop because it
5107    /// amortizes overhead.
5108    ///
5109    /// # Performance
5110    ///
5111    /// - Time complexity: O(n) where n is the number of IDs
5112    /// - Better cache utilization than individual lookups
5113    #[cfg(feature = "lpg")]
5114    #[must_use]
5115    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5116        let (epoch, transaction_id) = self.get_transaction_context();
5117        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5118        let active = self.active_lpg_store();
5119        ids.iter()
5120            .map(|&id| active.get_node_versioned(id, epoch, tx))
5121            .collect()
5122    }
5123
5124    // ── Change Data Capture ─────────────────────────────────────────────
5125
5126    /// Returns the full change history for an entity (node or edge).
5127    ///
5128    /// # Errors
5129    ///
5130    /// Returns an authorization error if the session lacks read permission.
5131    #[cfg(feature = "cdc")]
5132    pub fn history(
5133        &self,
5134        entity_id: impl Into<crate::cdc::EntityId>,
5135    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5136        self.require_permission(crate::auth::StatementKind::Read)?;
5137        Ok(self.cdc_log.history(entity_id.into()))
5138    }
5139
5140    /// Returns change events for an entity since the given epoch.
5141    ///
5142    /// # Errors
5143    ///
5144    /// Returns an authorization error if the session lacks read permission.
5145    #[cfg(feature = "cdc")]
5146    pub fn history_since(
5147        &self,
5148        entity_id: impl Into<crate::cdc::EntityId>,
5149        since_epoch: EpochId,
5150    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5151        self.require_permission(crate::auth::StatementKind::Read)?;
5152        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5153    }
5154
5155    /// Returns all change events across all entities in an epoch range.
5156    ///
5157    /// # Errors
5158    ///
5159    /// Returns an authorization error if the session lacks read permission.
5160    #[cfg(feature = "cdc")]
5161    pub fn changes_between(
5162        &self,
5163        start_epoch: EpochId,
5164        end_epoch: EpochId,
5165    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5166        self.require_permission(crate::auth::StatementKind::Read)?;
5167        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5168    }
5169}
5170
5171impl Drop for Session {
5172    fn drop(&mut self) {
5173        // Auto-rollback any active transaction to prevent leaked MVCC state,
5174        // dangling write locks, and uncommitted versions lingering in the store.
5175        #[cfg(feature = "lpg")]
5176        if self.in_transaction() {
5177            let _ = self.rollback_inner();
5178        }
5179
5180        #[cfg(feature = "metrics")]
5181        if let Some(ref reg) = self.metrics {
5182            reg.session_active
5183                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5184        }
5185    }
5186}
5187
5188#[cfg(test)]
5189mod tests {
5190    use super::parse_default_literal;
5191    use crate::database::GrafeoDB;
5192    use grafeo_common::types::Value;
5193
5194    // -----------------------------------------------------------------------
5195    // parse_default_literal
5196    // -----------------------------------------------------------------------
5197
5198    #[test]
5199    fn parse_default_literal_null() {
5200        assert_eq!(parse_default_literal("null"), Value::Null);
5201        assert_eq!(parse_default_literal("NULL"), Value::Null);
5202        assert_eq!(parse_default_literal("Null"), Value::Null);
5203    }
5204
5205    #[test]
5206    fn parse_default_literal_bool() {
5207        assert_eq!(parse_default_literal("true"), Value::Bool(true));
5208        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5209        assert_eq!(parse_default_literal("false"), Value::Bool(false));
5210        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5211    }
5212
5213    #[test]
5214    fn parse_default_literal_string_single_quoted() {
5215        assert_eq!(
5216            parse_default_literal("'hello'"),
5217            Value::String("hello".into())
5218        );
5219    }
5220
5221    #[test]
5222    fn parse_default_literal_string_double_quoted() {
5223        assert_eq!(
5224            parse_default_literal("\"world\""),
5225            Value::String("world".into())
5226        );
5227    }
5228
5229    #[test]
5230    fn parse_default_literal_integer() {
5231        assert_eq!(parse_default_literal("42"), Value::Int64(42));
5232        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5233        assert_eq!(parse_default_literal("0"), Value::Int64(0));
5234    }
5235
5236    #[test]
5237    fn parse_default_literal_float() {
5238        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5239        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5240    }
5241
5242    #[test]
5243    fn parse_default_literal_fallback_string() {
5244        // Not a recognized literal, not quoted, not a number
5245        assert_eq!(
5246            parse_default_literal("some_identifier"),
5247            Value::String("some_identifier".into())
5248        );
5249    }
5250
5251    #[test]
5252    fn test_session_create_node() {
5253        let db = GrafeoDB::new_in_memory();
5254        let session = db.session();
5255
5256        let id = session.create_node(&["Person"]);
5257        assert!(id.is_valid());
5258        assert_eq!(db.node_count(), 1);
5259    }
5260
5261    #[test]
5262    fn test_session_transaction() {
5263        let db = GrafeoDB::new_in_memory();
5264        let mut session = db.session();
5265
5266        assert!(!session.in_transaction());
5267
5268        session.begin_transaction().unwrap();
5269        assert!(session.in_transaction());
5270
5271        session.commit().unwrap();
5272        assert!(!session.in_transaction());
5273    }
5274
5275    #[test]
5276    fn test_session_transaction_context() {
5277        let db = GrafeoDB::new_in_memory();
5278        let mut session = db.session();
5279
5280        // Without transaction - context should have current epoch and no transaction_id
5281        let (_epoch1, transaction_id1) = session.get_transaction_context();
5282        assert!(transaction_id1.is_none());
5283
5284        // Start a transaction
5285        session.begin_transaction().unwrap();
5286        let (epoch2, transaction_id2) = session.get_transaction_context();
5287        assert!(transaction_id2.is_some());
5288        // Transaction should have a valid epoch
5289        let _ = epoch2; // Use the variable
5290
5291        // Commit and verify
5292        session.commit().unwrap();
5293        let (epoch3, tx_id3) = session.get_transaction_context();
5294        assert!(tx_id3.is_none());
5295        // Epoch should have advanced after commit
5296        assert!(epoch3.as_u64() >= epoch2.as_u64());
5297    }
5298
5299    #[test]
5300    fn test_session_rollback() {
5301        let db = GrafeoDB::new_in_memory();
5302        let mut session = db.session();
5303
5304        session.begin_transaction().unwrap();
5305        session.rollback().unwrap();
5306        assert!(!session.in_transaction());
5307    }
5308
5309    #[test]
5310    fn test_session_rollback_discards_versions() {
5311        use grafeo_common::types::TransactionId;
5312
5313        let db = GrafeoDB::new_in_memory();
5314
5315        // Create a node outside of any transaction (at system level)
5316        let node_before = db.store().create_node(&["Person"]);
5317        assert!(node_before.is_valid());
5318        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5319
5320        // Start a transaction
5321        let mut session = db.session();
5322        session.begin_transaction().unwrap();
5323        let transaction_id = session.current_transaction.lock().unwrap();
5324
5325        // Create a node versioned with the transaction's ID
5326        let epoch = db.store().current_epoch();
5327        let node_in_tx = db
5328            .store()
5329            .create_node_versioned(&["Person"], epoch, transaction_id);
5330        assert!(node_in_tx.is_valid());
5331
5332        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5333        // non-versioned lookups like node_count(). Verify the node is visible
5334        // only through the owning transaction.
5335        assert_eq!(
5336            db.node_count(),
5337            1,
5338            "PENDING nodes should be invisible to non-versioned node_count()"
5339        );
5340        assert!(
5341            db.store()
5342                .get_node_versioned(node_in_tx, epoch, transaction_id)
5343                .is_some(),
5344            "Transaction node should be visible to its own transaction"
5345        );
5346
5347        // Rollback the transaction
5348        session.rollback().unwrap();
5349        assert!(!session.in_transaction());
5350
5351        // The node created in the transaction should be discarded
5352        // Only the first node should remain visible
5353        let count_after = db.node_count();
5354        assert_eq!(
5355            count_after, 1,
5356            "Rollback should discard uncommitted node, but got {count_after}"
5357        );
5358
5359        // The original node should still be accessible
5360        let current_epoch = db.store().current_epoch();
5361        assert!(
5362            db.store()
5363                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5364                .is_some(),
5365            "Original node should still exist"
5366        );
5367
5368        // The node created in the transaction should not be accessible
5369        assert!(
5370            db.store()
5371                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5372                .is_none(),
5373            "Transaction node should be gone"
5374        );
5375    }
5376
5377    #[test]
5378    fn test_session_create_node_in_transaction() {
5379        // Test that session.create_node() is transaction-aware
5380        let db = GrafeoDB::new_in_memory();
5381
5382        // Create a node outside of any transaction
5383        let node_before = db.create_node(&["Person"]);
5384        assert!(node_before.is_valid());
5385        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5386
5387        // Start a transaction and create a node through the session
5388        let mut session = db.session();
5389        session.begin_transaction().unwrap();
5390        let transaction_id = session.current_transaction.lock().unwrap();
5391
5392        // Create a node through session.create_node() - should be versioned with tx
5393        let node_in_tx = session.create_node(&["Person"]);
5394        assert!(node_in_tx.is_valid());
5395
5396        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5397        // non-versioned lookups. Verify the node is visible only to its own tx.
5398        assert_eq!(
5399            db.node_count(),
5400            1,
5401            "PENDING nodes should be invisible to non-versioned node_count()"
5402        );
5403        let epoch = db.store().current_epoch();
5404        assert!(
5405            db.store()
5406                .get_node_versioned(node_in_tx, epoch, transaction_id)
5407                .is_some(),
5408            "Transaction node should be visible to its own transaction"
5409        );
5410
5411        // Rollback the transaction
5412        session.rollback().unwrap();
5413
5414        // The node created via session.create_node() should be discarded
5415        let count_after = db.node_count();
5416        assert_eq!(
5417            count_after, 1,
5418            "Rollback should discard node created via session.create_node(), but got {count_after}"
5419        );
5420    }
5421
5422    #[test]
5423    fn test_session_create_node_with_props_in_transaction() {
5424        use grafeo_common::types::Value;
5425
5426        // Test that session.create_node_with_props() is transaction-aware
5427        let db = GrafeoDB::new_in_memory();
5428
5429        // Create a node outside of any transaction
5430        db.create_node(&["Person"]);
5431        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5432
5433        // Start a transaction and create a node with properties
5434        let mut session = db.session();
5435        session.begin_transaction().unwrap();
5436        let transaction_id = session.current_transaction.lock().unwrap();
5437
5438        let node_in_tx = session
5439            .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5440            .unwrap();
5441        assert!(node_in_tx.is_valid());
5442
5443        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5444        // non-versioned lookups. Verify the node is visible only to its own tx.
5445        assert_eq!(
5446            db.node_count(),
5447            1,
5448            "PENDING nodes should be invisible to non-versioned node_count()"
5449        );
5450        let epoch = db.store().current_epoch();
5451        assert!(
5452            db.store()
5453                .get_node_versioned(node_in_tx, epoch, transaction_id)
5454                .is_some(),
5455            "Transaction node should be visible to its own transaction"
5456        );
5457
5458        // Rollback the transaction
5459        session.rollback().unwrap();
5460
5461        // The node should be discarded
5462        let count_after = db.node_count();
5463        assert_eq!(
5464            count_after, 1,
5465            "Rollback should discard node created via session.create_node_with_props()"
5466        );
5467    }
5468
5469    #[cfg(feature = "gql")]
5470    mod gql_tests {
5471        use super::*;
5472
5473        #[test]
5474        fn test_gql_query_execution() {
5475            let db = GrafeoDB::new_in_memory();
5476            let session = db.session();
5477
5478            // Create some test data
5479            session.create_node(&["Person"]);
5480            session.create_node(&["Person"]);
5481            session.create_node(&["Animal"]);
5482
5483            // Execute a GQL query
5484            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5485
5486            // Should return 2 Person nodes
5487            assert_eq!(result.row_count(), 2);
5488            assert_eq!(result.column_count(), 1);
5489            assert_eq!(result.columns[0], "n");
5490        }
5491
5492        #[test]
5493        fn test_gql_empty_result() {
5494            let db = GrafeoDB::new_in_memory();
5495            let session = db.session();
5496
5497            // No data in database
5498            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5499
5500            assert_eq!(result.row_count(), 0);
5501        }
5502
5503        #[test]
5504        fn test_gql_parse_error() {
5505            let db = GrafeoDB::new_in_memory();
5506            let session = db.session();
5507
5508            // Invalid GQL syntax
5509            let result = session.execute("MATCH (n RETURN n");
5510
5511            assert!(result.is_err());
5512        }
5513
5514        #[test]
5515        fn test_gql_relationship_traversal() {
5516            let db = GrafeoDB::new_in_memory();
5517            let session = db.session();
5518
5519            // Create a graph: Alix -> Gus, Alix -> Vincent
5520            let alix = session.create_node(&["Person"]);
5521            let gus = session.create_node(&["Person"]);
5522            let vincent = session.create_node(&["Person"]);
5523
5524            session.create_edge(alix, gus, "KNOWS");
5525            session.create_edge(alix, vincent, "KNOWS");
5526
5527            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5528            let result = session
5529                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5530                .unwrap();
5531
5532            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5533            assert_eq!(result.row_count(), 2);
5534            assert_eq!(result.column_count(), 2);
5535            assert_eq!(result.columns[0], "a");
5536            assert_eq!(result.columns[1], "b");
5537        }
5538
5539        #[test]
5540        fn test_gql_relationship_with_type_filter() {
5541            let db = GrafeoDB::new_in_memory();
5542            let session = db.session();
5543
5544            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5545            let alix = session.create_node(&["Person"]);
5546            let gus = session.create_node(&["Person"]);
5547            let vincent = session.create_node(&["Person"]);
5548
5549            session.create_edge(alix, gus, "KNOWS");
5550            session.create_edge(alix, vincent, "WORKS_WITH");
5551
5552            // Query only KNOWS relationships
5553            let result = session
5554                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5555                .unwrap();
5556
5557            // Should return only 1 row (Alix->Gus)
5558            assert_eq!(result.row_count(), 1);
5559        }
5560
5561        #[test]
5562        fn test_gql_semantic_error_undefined_variable() {
5563            let db = GrafeoDB::new_in_memory();
5564            let session = db.session();
5565
5566            // Reference undefined variable 'x' in RETURN
5567            let result = session.execute("MATCH (n:Person) RETURN x");
5568
5569            // Should fail with semantic error
5570            assert!(result.is_err());
5571            let Err(err) = result else {
5572                panic!("Expected error")
5573            };
5574            assert!(
5575                err.to_string().contains("Undefined variable"),
5576                "Expected undefined variable error, got: {}",
5577                err
5578            );
5579        }
5580
5581        #[test]
5582        fn test_gql_where_clause_property_filter() {
5583            use grafeo_common::types::Value;
5584
5585            let db = GrafeoDB::new_in_memory();
5586            let session = db.session();
5587
5588            // Create people with ages
5589            session
5590                .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5591                .unwrap();
5592            session
5593                .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5594                .unwrap();
5595            session
5596                .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5597                .unwrap();
5598
5599            // Query with WHERE clause: age > 30
5600            let result = session
5601                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5602                .unwrap();
5603
5604            // Should return 2 people (ages 35 and 45)
5605            assert_eq!(result.row_count(), 2);
5606        }
5607
5608        #[test]
5609        fn test_gql_where_clause_equality() {
5610            use grafeo_common::types::Value;
5611
5612            let db = GrafeoDB::new_in_memory();
5613            let session = db.session();
5614
5615            // Create people with names
5616            session
5617                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5618                .unwrap();
5619            session
5620                .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5621                .unwrap();
5622            session
5623                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5624                .unwrap();
5625
5626            // Query with WHERE clause: name = "Alix"
5627            let result = session
5628                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5629                .unwrap();
5630
5631            // Should return 2 people named Alix
5632            assert_eq!(result.row_count(), 2);
5633        }
5634
5635        #[test]
5636        fn test_gql_return_property_access() {
5637            use grafeo_common::types::Value;
5638
5639            let db = GrafeoDB::new_in_memory();
5640            let session = db.session();
5641
5642            // Create people with names and ages
5643            session
5644                .create_node_with_props(
5645                    &["Person"],
5646                    [
5647                        ("name", Value::String("Alix".into())),
5648                        ("age", Value::Int64(30)),
5649                    ],
5650                )
5651                .unwrap();
5652            session
5653                .create_node_with_props(
5654                    &["Person"],
5655                    [
5656                        ("name", Value::String("Gus".into())),
5657                        ("age", Value::Int64(25)),
5658                    ],
5659                )
5660                .unwrap();
5661
5662            // Query returning properties
5663            let result = session
5664                .execute("MATCH (n:Person) RETURN n.name, n.age")
5665                .unwrap();
5666
5667            // Should return 2 rows with name and age columns
5668            assert_eq!(result.row_count(), 2);
5669            assert_eq!(result.column_count(), 2);
5670            assert_eq!(result.columns[0], "n.name");
5671            assert_eq!(result.columns[1], "n.age");
5672
5673            // Check that we get actual values
5674            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5675            assert!(names.contains(&&Value::String("Alix".into())));
5676            assert!(names.contains(&&Value::String("Gus".into())));
5677        }
5678
5679        #[test]
5680        fn test_gql_return_mixed_expressions() {
5681            use grafeo_common::types::Value;
5682
5683            let db = GrafeoDB::new_in_memory();
5684            let session = db.session();
5685
5686            // Create a person
5687            session
5688                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5689                .unwrap();
5690
5691            // Query returning both node and property
5692            let result = session
5693                .execute("MATCH (n:Person) RETURN n, n.name")
5694                .unwrap();
5695
5696            assert_eq!(result.row_count(), 1);
5697            assert_eq!(result.column_count(), 2);
5698            assert_eq!(result.columns[0], "n");
5699            assert_eq!(result.columns[1], "n.name");
5700
5701            // Second column should be the name
5702            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5703        }
5704    }
5705
5706    #[cfg(feature = "cypher")]
5707    mod cypher_tests {
5708        use super::*;
5709
5710        #[test]
5711        fn test_cypher_query_execution() {
5712            let db = GrafeoDB::new_in_memory();
5713            let session = db.session();
5714
5715            // Create some test data
5716            session.create_node(&["Person"]);
5717            session.create_node(&["Person"]);
5718            session.create_node(&["Animal"]);
5719
5720            // Execute a Cypher query
5721            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5722
5723            // Should return 2 Person nodes
5724            assert_eq!(result.row_count(), 2);
5725            assert_eq!(result.column_count(), 1);
5726            assert_eq!(result.columns[0], "n");
5727        }
5728
5729        #[test]
5730        fn test_cypher_empty_result() {
5731            let db = GrafeoDB::new_in_memory();
5732            let session = db.session();
5733
5734            // No data in database
5735            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5736
5737            assert_eq!(result.row_count(), 0);
5738        }
5739
5740        #[test]
5741        fn test_cypher_parse_error() {
5742            let db = GrafeoDB::new_in_memory();
5743            let session = db.session();
5744
5745            // Invalid Cypher syntax
5746            let result = session.execute_cypher("MATCH (n RETURN n");
5747
5748            assert!(result.is_err());
5749        }
5750    }
5751
5752    // ==================== Direct Lookup API Tests ====================
5753
5754    mod direct_lookup_tests {
5755        use super::*;
5756        use grafeo_common::types::Value;
5757
5758        #[test]
5759        fn test_get_node() {
5760            let db = GrafeoDB::new_in_memory();
5761            let session = db.session();
5762
5763            let id = session.create_node(&["Person"]);
5764            let node = session.get_node(id);
5765
5766            assert!(node.is_some());
5767            let node = node.unwrap();
5768            assert_eq!(node.id, id);
5769        }
5770
5771        #[test]
5772        fn test_get_node_not_found() {
5773            use grafeo_common::types::NodeId;
5774
5775            let db = GrafeoDB::new_in_memory();
5776            let session = db.session();
5777
5778            // Try to get a non-existent node
5779            let node = session.get_node(NodeId::new(9999));
5780            assert!(node.is_none());
5781        }
5782
5783        #[test]
5784        fn test_get_node_property() {
5785            let db = GrafeoDB::new_in_memory();
5786            let session = db.session();
5787
5788            let id = session
5789                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5790                .unwrap();
5791
5792            let name = session.get_node_property(id, "name");
5793            assert_eq!(name, Some(Value::String("Alix".into())));
5794
5795            // Non-existent property
5796            let missing = session.get_node_property(id, "missing");
5797            assert!(missing.is_none());
5798        }
5799
5800        #[test]
5801        fn test_get_edge() {
5802            let db = GrafeoDB::new_in_memory();
5803            let session = db.session();
5804
5805            let alix = session.create_node(&["Person"]);
5806            let gus = session.create_node(&["Person"]);
5807            let edge_id = session.create_edge(alix, gus, "KNOWS");
5808
5809            let edge = session.get_edge(edge_id);
5810            assert!(edge.is_some());
5811            let edge = edge.unwrap();
5812            assert_eq!(edge.id, edge_id);
5813            assert_eq!(edge.src, alix);
5814            assert_eq!(edge.dst, gus);
5815        }
5816
5817        #[test]
5818        fn test_get_edge_not_found() {
5819            use grafeo_common::types::EdgeId;
5820
5821            let db = GrafeoDB::new_in_memory();
5822            let session = db.session();
5823
5824            let edge = session.get_edge(EdgeId::new(9999));
5825            assert!(edge.is_none());
5826        }
5827
5828        #[test]
5829        fn test_get_neighbors_outgoing() {
5830            let db = GrafeoDB::new_in_memory();
5831            let session = db.session();
5832
5833            let alix = session.create_node(&["Person"]);
5834            let gus = session.create_node(&["Person"]);
5835            let harm = session.create_node(&["Person"]);
5836
5837            session.create_edge(alix, gus, "KNOWS");
5838            session.create_edge(alix, harm, "KNOWS");
5839
5840            let neighbors = session.get_neighbors_outgoing(alix);
5841            assert_eq!(neighbors.len(), 2);
5842
5843            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5844            assert!(neighbor_ids.contains(&gus));
5845            assert!(neighbor_ids.contains(&harm));
5846        }
5847
5848        #[test]
5849        fn test_get_neighbors_incoming() {
5850            let db = GrafeoDB::new_in_memory();
5851            let session = db.session();
5852
5853            let alix = session.create_node(&["Person"]);
5854            let gus = session.create_node(&["Person"]);
5855            let harm = session.create_node(&["Person"]);
5856
5857            session.create_edge(gus, alix, "KNOWS");
5858            session.create_edge(harm, alix, "KNOWS");
5859
5860            let neighbors = session.get_neighbors_incoming(alix);
5861            assert_eq!(neighbors.len(), 2);
5862
5863            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5864            assert!(neighbor_ids.contains(&gus));
5865            assert!(neighbor_ids.contains(&harm));
5866        }
5867
5868        #[test]
5869        fn test_get_neighbors_outgoing_by_type() {
5870            let db = GrafeoDB::new_in_memory();
5871            let session = db.session();
5872
5873            let alix = session.create_node(&["Person"]);
5874            let gus = session.create_node(&["Person"]);
5875            let company = session.create_node(&["Company"]);
5876
5877            session.create_edge(alix, gus, "KNOWS");
5878            session.create_edge(alix, company, "WORKS_AT");
5879
5880            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5881            assert_eq!(knows_neighbors.len(), 1);
5882            assert_eq!(knows_neighbors[0].0, gus);
5883
5884            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5885            assert_eq!(works_neighbors.len(), 1);
5886            assert_eq!(works_neighbors[0].0, company);
5887
5888            // No edges of this type
5889            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5890            assert!(no_neighbors.is_empty());
5891        }
5892
5893        #[test]
5894        fn test_node_exists() {
5895            use grafeo_common::types::NodeId;
5896
5897            let db = GrafeoDB::new_in_memory();
5898            let session = db.session();
5899
5900            let id = session.create_node(&["Person"]);
5901
5902            assert!(session.node_exists(id));
5903            assert!(!session.node_exists(NodeId::new(9999)));
5904        }
5905
5906        #[test]
5907        fn test_edge_exists() {
5908            use grafeo_common::types::EdgeId;
5909
5910            let db = GrafeoDB::new_in_memory();
5911            let session = db.session();
5912
5913            let alix = session.create_node(&["Person"]);
5914            let gus = session.create_node(&["Person"]);
5915            let edge_id = session.create_edge(alix, gus, "KNOWS");
5916
5917            assert!(session.edge_exists(edge_id));
5918            assert!(!session.edge_exists(EdgeId::new(9999)));
5919        }
5920
5921        #[test]
5922        fn test_get_degree() {
5923            let db = GrafeoDB::new_in_memory();
5924            let session = db.session();
5925
5926            let alix = session.create_node(&["Person"]);
5927            let gus = session.create_node(&["Person"]);
5928            let harm = session.create_node(&["Person"]);
5929
5930            // Alix knows Gus and Harm (2 outgoing)
5931            session.create_edge(alix, gus, "KNOWS");
5932            session.create_edge(alix, harm, "KNOWS");
5933            // Gus knows Alix (1 incoming for Alix)
5934            session.create_edge(gus, alix, "KNOWS");
5935
5936            let (out_degree, in_degree) = session.get_degree(alix);
5937            assert_eq!(out_degree, 2);
5938            assert_eq!(in_degree, 1);
5939
5940            // Node with no edges
5941            let lonely = session.create_node(&["Person"]);
5942            let (out, in_deg) = session.get_degree(lonely);
5943            assert_eq!(out, 0);
5944            assert_eq!(in_deg, 0);
5945        }
5946
5947        #[test]
5948        fn test_get_nodes_batch() {
5949            let db = GrafeoDB::new_in_memory();
5950            let session = db.session();
5951
5952            let alix = session.create_node(&["Person"]);
5953            let gus = session.create_node(&["Person"]);
5954            let harm = session.create_node(&["Person"]);
5955
5956            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5957            assert_eq!(nodes.len(), 3);
5958            assert!(nodes[0].is_some());
5959            assert!(nodes[1].is_some());
5960            assert!(nodes[2].is_some());
5961
5962            // With non-existent node
5963            use grafeo_common::types::NodeId;
5964            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5965            assert_eq!(nodes_with_missing.len(), 3);
5966            assert!(nodes_with_missing[0].is_some());
5967            assert!(nodes_with_missing[1].is_none()); // Missing node
5968            assert!(nodes_with_missing[2].is_some());
5969        }
5970
5971        #[test]
5972        fn test_auto_commit_setting() {
5973            let db = GrafeoDB::new_in_memory();
5974            let mut session = db.session();
5975
5976            // Default is auto-commit enabled
5977            assert!(session.auto_commit());
5978
5979            session.set_auto_commit(false);
5980            assert!(!session.auto_commit());
5981
5982            session.set_auto_commit(true);
5983            assert!(session.auto_commit());
5984        }
5985
5986        #[test]
5987        fn test_transaction_double_begin_nests() {
5988            let db = GrafeoDB::new_in_memory();
5989            let mut session = db.session();
5990
5991            session.begin_transaction().unwrap();
5992            // Second begin_transaction creates a nested transaction (auto-savepoint)
5993            let result = session.begin_transaction();
5994            assert!(result.is_ok());
5995            // Commit the inner (releases savepoint)
5996            session.commit().unwrap();
5997            // Commit the outer
5998            session.commit().unwrap();
5999        }
6000
6001        #[test]
6002        fn test_commit_without_transaction_error() {
6003            let db = GrafeoDB::new_in_memory();
6004            let mut session = db.session();
6005
6006            let result = session.commit();
6007            assert!(result.is_err());
6008        }
6009
6010        #[test]
6011        fn test_rollback_without_transaction_error() {
6012            let db = GrafeoDB::new_in_memory();
6013            let mut session = db.session();
6014
6015            let result = session.rollback();
6016            assert!(result.is_err());
6017        }
6018
6019        #[test]
6020        fn test_create_edge_in_transaction() {
6021            let db = GrafeoDB::new_in_memory();
6022            let mut session = db.session();
6023
6024            // Create nodes outside transaction
6025            let alix = session.create_node(&["Person"]);
6026            let gus = session.create_node(&["Person"]);
6027
6028            // Create edge in transaction
6029            session.begin_transaction().unwrap();
6030            let edge_id = session.create_edge(alix, gus, "KNOWS");
6031
6032            // Edge should be visible in the transaction
6033            assert!(session.edge_exists(edge_id));
6034
6035            // Commit
6036            session.commit().unwrap();
6037
6038            // Edge should still be visible
6039            assert!(session.edge_exists(edge_id));
6040        }
6041
6042        #[test]
6043        fn test_neighbors_empty_node() {
6044            let db = GrafeoDB::new_in_memory();
6045            let session = db.session();
6046
6047            let lonely = session.create_node(&["Person"]);
6048
6049            assert!(session.get_neighbors_outgoing(lonely).is_empty());
6050            assert!(session.get_neighbors_incoming(lonely).is_empty());
6051            assert!(
6052                session
6053                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6054                    .is_empty()
6055            );
6056        }
6057    }
6058
6059    #[test]
6060    fn test_auto_gc_triggers_on_commit_interval() {
6061        use crate::config::Config;
6062
6063        let config = Config::in_memory().with_gc_interval(2);
6064        let db = GrafeoDB::with_config(config).unwrap();
6065        let mut session = db.session();
6066
6067        // First commit: counter = 1, no GC (not a multiple of 2)
6068        session.begin_transaction().unwrap();
6069        session.create_node(&["A"]);
6070        session.commit().unwrap();
6071
6072        // Second commit: counter = 2, GC should trigger (multiple of 2)
6073        session.begin_transaction().unwrap();
6074        session.create_node(&["B"]);
6075        session.commit().unwrap();
6076
6077        // Verify the database is still functional after GC
6078        assert_eq!(db.node_count(), 2);
6079    }
6080
6081    #[test]
6082    fn test_query_timeout_config_propagates_to_session() {
6083        use crate::config::Config;
6084        use std::time::Duration;
6085
6086        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6087        let db = GrafeoDB::with_config(config).unwrap();
6088        let session = db.session();
6089
6090        // Verify the session has a query deadline (timeout was set)
6091        assert!(session.query_deadline().is_some());
6092    }
6093
6094    #[test]
6095    fn test_default_query_timeout_returns_deadline() {
6096        let db = GrafeoDB::new_in_memory();
6097        let session = db.session();
6098
6099        // Default config has 30s timeout
6100        assert!(session.query_deadline().is_some());
6101    }
6102
6103    #[test]
6104    fn test_no_query_timeout_returns_no_deadline() {
6105        use crate::config::Config;
6106
6107        let config = Config::in_memory().without_query_timeout();
6108        let db = GrafeoDB::with_config(config).unwrap();
6109        let session = db.session();
6110
6111        assert!(session.query_deadline().is_none());
6112    }
6113
6114    #[test]
6115    fn test_graph_model_accessor() {
6116        use crate::config::GraphModel;
6117
6118        let db = GrafeoDB::new_in_memory();
6119        let session = db.session();
6120
6121        assert_eq!(session.graph_model(), GraphModel::Lpg);
6122    }
6123
6124    #[test]
6125    fn test_reject_oversized_property() {
6126        use crate::config::Config;
6127
6128        let config = Config::in_memory().with_max_property_size(100);
6129        let db = GrafeoDB::with_config(config).unwrap();
6130        let session = db.session();
6131
6132        let node = session.create_node(&["Test"]);
6133
6134        // Small property should succeed
6135        session
6136            .set_node_property(node, "small", Value::from("hello"))
6137            .unwrap();
6138
6139        // Large property should be rejected
6140        let big = "x".repeat(200);
6141        let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6142        assert!(result.is_err());
6143        let err = result.unwrap_err().to_string();
6144        assert!(
6145            err.contains("exceeds maximum size"),
6146            "Expected size error, got: {err}"
6147        );
6148    }
6149
6150    #[test]
6151    fn test_no_property_size_limit() {
6152        use crate::config::Config;
6153
6154        let config = Config::in_memory().without_max_property_size();
6155        let db = GrafeoDB::with_config(config).unwrap();
6156        let session = db.session();
6157
6158        let node = session.create_node(&["Test"]);
6159
6160        // Even large properties should succeed with no limit
6161        let big = "x".repeat(10_000);
6162        session
6163            .set_node_property(node, "big", Value::from(big.as_str()))
6164            .unwrap();
6165    }
6166
6167    #[cfg(feature = "gql")]
6168    #[test]
6169    fn test_external_store_session() {
6170        use grafeo_core::graph::GraphStoreMut;
6171        use std::sync::Arc;
6172
6173        let config = crate::config::Config::in_memory();
6174        let store =
6175            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6176        let db = GrafeoDB::with_store(store, config).unwrap();
6177
6178        let mut session = db.session();
6179
6180        // Use an explicit transaction so that INSERT and MATCH share the same
6181        // transaction context. With PENDING epochs, uncommitted versions are
6182        // only visible to the owning transaction.
6183        session.begin_transaction().unwrap();
6184        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6185
6186        // Verify we can query through it within the same transaction
6187        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6188        assert_eq!(result.row_count(), 1);
6189
6190        session.commit().unwrap();
6191    }
6192
6193    // ==================== Session Command Tests ====================
6194
6195    #[cfg(feature = "gql")]
6196    mod session_command_tests {
6197        use super::*;
6198        use grafeo_common::types::Value;
6199
6200        #[test]
6201        fn test_use_graph_sets_current_graph() {
6202            let db = GrafeoDB::new_in_memory();
6203            let session = db.session();
6204
6205            // Create the graph first, then USE it
6206            session.execute("CREATE GRAPH mydb").unwrap();
6207            session.execute("USE GRAPH mydb").unwrap();
6208
6209            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6210        }
6211
6212        #[test]
6213        fn test_use_graph_nonexistent_errors() {
6214            let db = GrafeoDB::new_in_memory();
6215            let session = db.session();
6216
6217            let result = session.execute("USE GRAPH doesnotexist");
6218            assert!(result.is_err());
6219            let err = result.unwrap_err().to_string();
6220            assert!(
6221                err.contains("does not exist"),
6222                "Expected 'does not exist' error, got: {err}"
6223            );
6224        }
6225
6226        #[test]
6227        fn test_use_graph_default_always_valid() {
6228            let db = GrafeoDB::new_in_memory();
6229            let session = db.session();
6230
6231            // "default" is always valid, even without CREATE GRAPH
6232            session.execute("USE GRAPH default").unwrap();
6233            assert_eq!(session.current_graph(), Some("default".to_string()));
6234        }
6235
6236        #[test]
6237        fn test_session_set_graph() {
6238            let db = GrafeoDB::new_in_memory();
6239            let session = db.session();
6240
6241            session.execute("CREATE GRAPH analytics").unwrap();
6242            session.execute("SESSION SET GRAPH analytics").unwrap();
6243            assert_eq!(session.current_graph(), Some("analytics".to_string()));
6244        }
6245
6246        #[test]
6247        fn test_session_set_graph_nonexistent_errors() {
6248            let db = GrafeoDB::new_in_memory();
6249            let session = db.session();
6250
6251            let result = session.execute("SESSION SET GRAPH nosuchgraph");
6252            assert!(result.is_err());
6253        }
6254
6255        #[test]
6256        fn test_session_set_time_zone() {
6257            let db = GrafeoDB::new_in_memory();
6258            let session = db.session();
6259
6260            assert_eq!(session.time_zone(), None);
6261
6262            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6263            assert_eq!(session.time_zone(), Some("UTC".to_string()));
6264
6265            session
6266                .execute("SESSION SET TIME ZONE 'America/New_York'")
6267                .unwrap();
6268            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6269        }
6270
6271        #[test]
6272        fn test_session_set_parameter() {
6273            let db = GrafeoDB::new_in_memory();
6274            let session = db.session();
6275
6276            session
6277                .execute("SESSION SET PARAMETER $timeout = 30")
6278                .unwrap();
6279
6280            // Parameter is stored (value is Null for now, since expression
6281            // evaluation is not yet wired up)
6282            assert!(session.get_parameter("timeout").is_some());
6283        }
6284
6285        #[test]
6286        fn test_session_reset_clears_all_state() {
6287            let db = GrafeoDB::new_in_memory();
6288            let session = db.session();
6289
6290            // Set various session state
6291            session.execute("CREATE GRAPH analytics").unwrap();
6292            session.execute("SESSION SET GRAPH analytics").unwrap();
6293            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6294            session
6295                .execute("SESSION SET PARAMETER $limit = 100")
6296                .unwrap();
6297
6298            // Verify state was set
6299            assert!(session.current_graph().is_some());
6300            assert!(session.time_zone().is_some());
6301            assert!(session.get_parameter("limit").is_some());
6302
6303            // Reset everything
6304            session.execute("SESSION RESET").unwrap();
6305
6306            assert_eq!(session.current_graph(), None);
6307            assert_eq!(session.time_zone(), None);
6308            assert!(session.get_parameter("limit").is_none());
6309        }
6310
6311        #[test]
6312        fn test_session_close_clears_state() {
6313            let db = GrafeoDB::new_in_memory();
6314            let session = db.session();
6315
6316            session.execute("CREATE GRAPH analytics").unwrap();
6317            session.execute("SESSION SET GRAPH analytics").unwrap();
6318            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6319
6320            session.execute("SESSION CLOSE").unwrap();
6321
6322            assert_eq!(session.current_graph(), None);
6323            assert_eq!(session.time_zone(), None);
6324        }
6325
6326        #[test]
6327        fn test_create_graph() {
6328            let db = GrafeoDB::new_in_memory();
6329            let session = db.session();
6330
6331            session.execute("CREATE GRAPH mydb").unwrap();
6332
6333            // Should be able to USE it now
6334            session.execute("USE GRAPH mydb").unwrap();
6335            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6336        }
6337
6338        #[test]
6339        fn test_create_graph_duplicate_errors() {
6340            let db = GrafeoDB::new_in_memory();
6341            let session = db.session();
6342
6343            session.execute("CREATE GRAPH mydb").unwrap();
6344            let result = session.execute("CREATE GRAPH mydb");
6345
6346            assert!(result.is_err());
6347            let err = result.unwrap_err().to_string();
6348            assert!(
6349                err.contains("already exists"),
6350                "Expected 'already exists' error, got: {err}"
6351            );
6352        }
6353
6354        #[test]
6355        fn test_create_graph_if_not_exists() {
6356            let db = GrafeoDB::new_in_memory();
6357            let session = db.session();
6358
6359            session.execute("CREATE GRAPH mydb").unwrap();
6360            // Should succeed silently with IF NOT EXISTS
6361            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6362        }
6363
6364        #[test]
6365        fn test_drop_graph() {
6366            let db = GrafeoDB::new_in_memory();
6367            let session = db.session();
6368
6369            session.execute("CREATE GRAPH mydb").unwrap();
6370            session.execute("DROP GRAPH mydb").unwrap();
6371
6372            // Should no longer be usable
6373            let result = session.execute("USE GRAPH mydb");
6374            assert!(result.is_err());
6375        }
6376
6377        #[test]
6378        fn test_drop_graph_nonexistent_errors() {
6379            let db = GrafeoDB::new_in_memory();
6380            let session = db.session();
6381
6382            let result = session.execute("DROP GRAPH nosuchgraph");
6383            assert!(result.is_err());
6384            let err = result.unwrap_err().to_string();
6385            assert!(
6386                err.contains("does not exist"),
6387                "Expected 'does not exist' error, got: {err}"
6388            );
6389        }
6390
6391        #[test]
6392        fn test_drop_graph_if_exists() {
6393            let db = GrafeoDB::new_in_memory();
6394            let session = db.session();
6395
6396            // Should succeed silently with IF EXISTS
6397            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6398        }
6399
6400        #[test]
6401        fn test_start_transaction_via_gql() {
6402            let db = GrafeoDB::new_in_memory();
6403            let session = db.session();
6404
6405            session.execute("START TRANSACTION").unwrap();
6406            assert!(session.in_transaction());
6407            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6408            session.execute("COMMIT").unwrap();
6409            assert!(!session.in_transaction());
6410
6411            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6412            assert_eq!(result.rows.len(), 1);
6413        }
6414
6415        #[test]
6416        fn test_start_transaction_read_only_blocks_insert() {
6417            let db = GrafeoDB::new_in_memory();
6418            let session = db.session();
6419
6420            session.execute("START TRANSACTION READ ONLY").unwrap();
6421            let result = session.execute("INSERT (:Person {name: 'Alix'})");
6422            assert!(result.is_err());
6423            let err = result.unwrap_err().to_string();
6424            assert!(
6425                err.contains("read-only"),
6426                "Expected read-only error, got: {err}"
6427            );
6428            session.execute("ROLLBACK").unwrap();
6429        }
6430
6431        #[test]
6432        fn test_start_transaction_read_only_allows_reads() {
6433            let db = GrafeoDB::new_in_memory();
6434            let mut session = db.session();
6435            session.begin_transaction().unwrap();
6436            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6437            session.commit().unwrap();
6438
6439            session.execute("START TRANSACTION READ ONLY").unwrap();
6440            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6441            assert_eq!(result.rows.len(), 1);
6442            session.execute("COMMIT").unwrap();
6443        }
6444
6445        #[test]
6446        fn test_rollback_via_gql() {
6447            let db = GrafeoDB::new_in_memory();
6448            let session = db.session();
6449
6450            session.execute("START TRANSACTION").unwrap();
6451            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6452            session.execute("ROLLBACK").unwrap();
6453
6454            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6455            assert!(result.rows.is_empty());
6456        }
6457
6458        #[test]
6459        fn test_start_transaction_with_isolation_level() {
6460            let db = GrafeoDB::new_in_memory();
6461            let session = db.session();
6462
6463            session
6464                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6465                .unwrap();
6466            assert!(session.in_transaction());
6467            session.execute("ROLLBACK").unwrap();
6468        }
6469
6470        #[test]
6471        fn test_session_commands_return_empty_result() {
6472            let db = GrafeoDB::new_in_memory();
6473            let session = db.session();
6474
6475            session.execute("CREATE GRAPH test").unwrap();
6476            let result = session.execute("SESSION SET GRAPH test").unwrap();
6477            assert_eq!(result.row_count(), 0);
6478            assert_eq!(result.column_count(), 0);
6479        }
6480
6481        #[test]
6482        fn test_current_graph_default_is_none() {
6483            let db = GrafeoDB::new_in_memory();
6484            let session = db.session();
6485
6486            assert_eq!(session.current_graph(), None);
6487        }
6488
6489        #[test]
6490        fn test_time_zone_default_is_none() {
6491            let db = GrafeoDB::new_in_memory();
6492            let session = db.session();
6493
6494            assert_eq!(session.time_zone(), None);
6495        }
6496
6497        #[test]
6498        fn test_session_state_independent_across_sessions() {
6499            let db = GrafeoDB::new_in_memory();
6500            let session1 = db.session();
6501            let session2 = db.session();
6502
6503            session1.execute("CREATE GRAPH first").unwrap();
6504            session1.execute("CREATE GRAPH second").unwrap();
6505            session1.execute("SESSION SET GRAPH first").unwrap();
6506            session2.execute("SESSION SET GRAPH second").unwrap();
6507
6508            assert_eq!(session1.current_graph(), Some("first".to_string()));
6509            assert_eq!(session2.current_graph(), Some("second".to_string()));
6510        }
6511
6512        #[test]
6513        fn test_show_node_types() {
6514            let db = GrafeoDB::new_in_memory();
6515            let session = db.session();
6516
6517            session
6518                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6519                .unwrap();
6520
6521            let result = session.execute("SHOW NODE TYPES").unwrap();
6522            assert_eq!(
6523                result.columns,
6524                vec!["name", "properties", "constraints", "parents"]
6525            );
6526            assert_eq!(result.rows.len(), 1);
6527            // First column is the type name
6528            assert_eq!(result.rows[0][0], Value::from("Person"));
6529        }
6530
6531        #[test]
6532        fn test_show_edge_types() {
6533            let db = GrafeoDB::new_in_memory();
6534            let session = db.session();
6535
6536            session
6537                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6538                .unwrap();
6539
6540            let result = session.execute("SHOW EDGE TYPES").unwrap();
6541            assert_eq!(
6542                result.columns,
6543                vec!["name", "properties", "source_types", "target_types"]
6544            );
6545            assert_eq!(result.rows.len(), 1);
6546            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6547        }
6548
6549        #[test]
6550        fn test_show_graph_types() {
6551            let db = GrafeoDB::new_in_memory();
6552            let session = db.session();
6553
6554            session
6555                .execute("CREATE NODE TYPE Person (name STRING)")
6556                .unwrap();
6557            session
6558                .execute(
6559                    "CREATE GRAPH TYPE social (\
6560                        NODE TYPE Person (name STRING)\
6561                    )",
6562                )
6563                .unwrap();
6564
6565            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6566            assert_eq!(
6567                result.columns,
6568                vec!["name", "open", "node_types", "edge_types"]
6569            );
6570            assert_eq!(result.rows.len(), 1);
6571            assert_eq!(result.rows[0][0], Value::from("social"));
6572        }
6573
6574        #[test]
6575        fn test_show_graph_type_named() {
6576            let db = GrafeoDB::new_in_memory();
6577            let session = db.session();
6578
6579            session
6580                .execute("CREATE NODE TYPE Person (name STRING)")
6581                .unwrap();
6582            session
6583                .execute(
6584                    "CREATE GRAPH TYPE social (\
6585                        NODE TYPE Person (name STRING)\
6586                    )",
6587                )
6588                .unwrap();
6589
6590            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6591            assert_eq!(result.rows.len(), 1);
6592            assert_eq!(result.rows[0][0], Value::from("social"));
6593        }
6594
6595        #[test]
6596        fn test_show_graph_type_not_found() {
6597            let db = GrafeoDB::new_in_memory();
6598            let session = db.session();
6599
6600            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6601            assert!(result.is_err());
6602        }
6603
6604        #[test]
6605        fn test_show_indexes_via_gql() {
6606            let db = GrafeoDB::new_in_memory();
6607            let session = db.session();
6608
6609            let result = session.execute("SHOW INDEXES").unwrap();
6610            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6611        }
6612
6613        #[test]
6614        fn test_show_constraints_via_gql() {
6615            let db = GrafeoDB::new_in_memory();
6616            let session = db.session();
6617
6618            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6619            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6620        }
6621
6622        #[test]
6623        fn test_pattern_form_graph_type_roundtrip() {
6624            let db = GrafeoDB::new_in_memory();
6625            let session = db.session();
6626
6627            // Register the types first
6628            session
6629                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6630                .unwrap();
6631            session
6632                .execute("CREATE NODE TYPE City (name STRING)")
6633                .unwrap();
6634            session
6635                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6636                .unwrap();
6637            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6638
6639            // Create graph type using pattern form
6640            session
6641                .execute(
6642                    "CREATE GRAPH TYPE social (\
6643                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6644                        (:Person)-[:LIVES_IN]->(:City)\
6645                    )",
6646                )
6647                .unwrap();
6648
6649            // Verify it was created
6650            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6651            assert_eq!(result.rows.len(), 1);
6652            assert_eq!(result.rows[0][0], Value::from("social"));
6653        }
6654    }
6655}