Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut, GraphStoreSearch};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40/// Storage key suffix for the implicit default graph within a schema.
41/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
42const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44/// Parses a DDL default-value literal string into a [`Value`].
45///
46/// Handles string literals (single- or double-quoted), integers, floats,
47/// booleans (`true`/`false`), and `NULL`.
48fn parse_default_literal(text: &str) -> Value {
49    if text.eq_ignore_ascii_case("null") {
50        return Value::Null;
51    }
52    if text.eq_ignore_ascii_case("true") {
53        return Value::Bool(true);
54    }
55    if text.eq_ignore_ascii_case("false") {
56        return Value::Bool(false);
57    }
58    // String literal: strip surrounding quotes
59    if (text.starts_with('\'') && text.ends_with('\''))
60        || (text.starts_with('"') && text.ends_with('"'))
61    {
62        return Value::String(text[1..text.len() - 1].into());
63    }
64    // Try integer, then float
65    if let Ok(i) = text.parse::<i64>() {
66        return Value::Int64(i);
67    }
68    if let Ok(f) = text.parse::<f64>() {
69        return Value::Float64(f);
70    }
71    // Fallback: treat as string
72    Value::String(text.into())
73}
74
75/// Runtime configuration for creating a new session.
76///
77/// Groups the shared parameters passed to all session constructors, keeping
78/// call sites readable and avoiding long argument lists.
79pub(crate) struct SessionConfig {
80    pub transaction_manager: Arc<TransactionManager>,
81    pub query_cache: Arc<QueryCache>,
82    pub catalog: Arc<Catalog>,
83    pub adaptive_config: AdaptiveConfig,
84    pub factorized_execution: bool,
85    pub graph_model: GraphModel,
86    pub query_timeout: Option<Duration>,
87    pub max_property_size: Option<usize>,
88    /// Buffer manager for memory-aware query execution.
89    pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90    pub commit_counter: Arc<AtomicUsize>,
91    pub gc_interval: usize,
92    /// When true, the session permanently blocks all mutations.
93    pub read_only: bool,
94    /// The identity bound to this session (for permission checks).
95    pub identity: crate::auth::Identity,
96    /// Named graph projections shared with the database.
97    #[cfg(feature = "lpg")]
98    pub projections: Arc<
99        parking_lot::RwLock<
100            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101        >,
102    >,
103}
104
105/// Your handle to the database - execute queries and manage transactions.
106///
107/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
108/// tracks its own transaction state, so you can have multiple concurrent
109/// sessions without them interfering.
110pub struct Session {
111    /// The underlying store.
112    #[cfg(feature = "lpg")]
113    store: Arc<LpgStore>,
114    /// Classifies the role of `store` for the active backend.
115    /// Search procedures (CALL grafeo.search.*) only reach into `store` when
116    /// this is `Active`. External-store sessions keep `store` as a placeholder
117    /// and must not expose it: it has no indexes or data.
118    #[cfg(feature = "lpg")]
119    lpg_backend: LpgBackend,
120    /// Graph store trait object for pluggable storage backends (read path).
121    graph_store: Arc<dyn GraphStoreSearch>,
122    /// Writable graph store (None for read-only databases).
123    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
124    /// Schema and metadata catalog shared across sessions.
125    catalog: Arc<Catalog>,
126    /// RDF triple store (if RDF feature is enabled).
127    #[cfg(feature = "triple-store")]
128    rdf_store: Arc<RdfStore>,
129    /// Transaction manager.
130    transaction_manager: Arc<TransactionManager>,
131    /// Query cache shared across sessions.
132    query_cache: Arc<QueryCache>,
133    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
134    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
135    /// from within `execute(&self)`.
136    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
137    /// Whether the current transaction is read-only (blocks mutations).
138    read_only_tx: parking_lot::Mutex<bool>,
139    /// Whether the database itself is read-only (set at open time, never changes).
140    /// When true, `read_only_tx` is always true regardless of transaction flags.
141    db_read_only: bool,
142    /// The identity bound to this session (determines permission level).
143    identity: crate::auth::Identity,
144    /// Whether the session is in auto-commit mode.
145    auto_commit: bool,
146    /// Adaptive execution configuration.
147    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
148    adaptive_config: AdaptiveConfig,
149    /// Whether to use factorized execution for multi-hop queries.
150    factorized_execution: bool,
151    /// The graph data model this session operates on.
152    graph_model: GraphModel,
153    /// Maximum time a query may run before being cancelled.
154    query_timeout: Option<Duration>,
155    /// Maximum size in bytes for a single property value.
156    max_property_size: Option<usize>,
157    /// Buffer manager for memory-aware execution (spill decisions).
158    buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
159    /// Shared commit counter for triggering auto-GC.
160    commit_counter: Arc<AtomicUsize>,
161    /// GC every N commits (0 = disabled).
162    gc_interval: usize,
163    /// Node count at the start of the current transaction (for PreparedCommit stats).
164    transaction_start_node_count: AtomicUsize,
165    /// Edge count at the start of the current transaction (for PreparedCommit stats).
166    transaction_start_edge_count: AtomicUsize,
167    /// WAL for logging schema changes.
168    #[cfg(feature = "wal")]
169    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
170    /// Shared WAL graph context tracker for named graph awareness.
171    #[cfg(feature = "wal")]
172    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
173    /// CDC log for change tracking.
174    #[cfg(feature = "cdc")]
175    cdc_log: Arc<crate::cdc::CdcLog>,
176    /// Buffered CDC events for the current transaction.
177    /// Flushed to `cdc_log` on commit, discarded on rollback.
178    #[cfg(feature = "cdc")]
179    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
180    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
181    current_graph: parking_lot::Mutex<Option<String>>,
182    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
183    /// None = "not set" (uses default schema).
184    current_schema: parking_lot::Mutex<Option<String>>,
185    /// Session time zone override.
186    time_zone: parking_lot::Mutex<Option<String>>,
187    /// Session-level parameters (SET PARAMETER).
188    session_params:
189        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
190    /// Override epoch for time-travel queries (None = use transaction/current epoch).
191    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
192    /// Savepoints within the current transaction.
193    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
194    /// Nesting depth for nested transactions (0 = outermost).
195    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
196    /// releases it, nested `ROLLBACK` rolls back to it.
197    transaction_nesting_depth: parking_lot::Mutex<u32>,
198    /// Named graphs touched during the current transaction (for cross-graph atomicity).
199    /// `None` represents the default graph. Populated at `BEGIN` time and on each
200    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
201    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
202    /// Count of active `ResultStream`s pinned to this session. Commit and
203    /// rollback block while any streams are outstanding so mid-iteration
204    /// snapshots are not invalidated.
205    active_streams: AtomicUsize,
206    /// Shared metrics registry (populated when the `metrics` feature is enabled).
207    #[cfg(feature = "metrics")]
208    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
209    /// Transaction start time for duration tracking.
210    #[cfg(feature = "metrics")]
211    tx_start_time: parking_lot::Mutex<Option<Instant>>,
212    /// Named graph projections shared with the database.
213    #[cfg(feature = "lpg")]
214    projections: Arc<
215        parking_lot::RwLock<
216            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
217        >,
218    >,
219}
220
221/// Role of the session's internal `LpgStore`.
222#[cfg(feature = "lpg")]
223#[derive(Clone, Copy)]
224enum LpgBackend {
225    /// The internal `LpgStore` is the session's active backing store (possibly
226    /// wrapped by WAL/CDC/Layered decorators on the read/write path). Search
227    /// procedures can reach its HNSW/BM25 indexes.
228    Active,
229    /// The internal `LpgStore` is an empty placeholder because the session is
230    /// backed by an external `GraphStoreSearch` implementation. Search
231    /// procedures must not use it.
232    Placeholder,
233}
234
235/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
236#[derive(Clone)]
237struct GraphSavepoint {
238    graph_name: Option<String>,
239    next_node_id: u64,
240    next_edge_id: u64,
241    undo_log_position: usize,
242}
243
244/// Savepoint state: name + per-graph snapshots + the graph that was active.
245#[derive(Clone)]
246struct SavepointState {
247    name: String,
248    graph_snapshots: Vec<GraphSavepoint>,
249    /// The graph that was active when the savepoint was created.
250    /// Reserved for future use (e.g., restoring graph context on rollback).
251    #[allow(dead_code)]
252    active_graph: Option<String>,
253    /// CDC event buffer position at savepoint creation.
254    /// On rollback-to-savepoint, the buffer is truncated to this position.
255    #[cfg(feature = "cdc")]
256    cdc_event_position: usize,
257}
258
259impl Session {
260    /// Creates a new session with adaptive execution configuration.
261    #[cfg(feature = "lpg")]
262    #[allow(dead_code)] // Used when lpg enabled without triple-store
263    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
264        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
265        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
266        Self {
267            store,
268            lpg_backend: LpgBackend::Active,
269            graph_store,
270            graph_store_mut,
271            catalog: cfg.catalog,
272            #[cfg(feature = "triple-store")]
273            rdf_store: Arc::new(RdfStore::new()),
274            transaction_manager: cfg.transaction_manager,
275            query_cache: cfg.query_cache,
276            current_transaction: parking_lot::Mutex::new(None),
277            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
278            db_read_only: cfg.read_only,
279            identity: cfg.identity,
280            auto_commit: true,
281            adaptive_config: cfg.adaptive_config,
282            factorized_execution: cfg.factorized_execution,
283            graph_model: cfg.graph_model,
284            query_timeout: cfg.query_timeout,
285            max_property_size: cfg.max_property_size,
286            buffer_manager: cfg.buffer_manager,
287            commit_counter: cfg.commit_counter,
288            gc_interval: cfg.gc_interval,
289            transaction_start_node_count: AtomicUsize::new(0),
290            transaction_start_edge_count: AtomicUsize::new(0),
291            #[cfg(feature = "wal")]
292            wal: None,
293            #[cfg(feature = "wal")]
294            wal_graph_context: None,
295            #[cfg(feature = "cdc")]
296            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
297            #[cfg(feature = "cdc")]
298            cdc_pending_events: None,
299            current_graph: parking_lot::Mutex::new(None),
300            current_schema: parking_lot::Mutex::new(None),
301            time_zone: parking_lot::Mutex::new(None),
302            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303            viewing_epoch_override: parking_lot::Mutex::new(None),
304            savepoints: parking_lot::Mutex::new(Vec::new()),
305            transaction_nesting_depth: parking_lot::Mutex::new(0),
306            touched_graphs: parking_lot::Mutex::new(Vec::new()),
307            active_streams: AtomicUsize::new(0),
308            #[cfg(feature = "metrics")]
309            metrics: None,
310            #[cfg(feature = "metrics")]
311            tx_start_time: parking_lot::Mutex::new(None),
312            projections: cfg.projections,
313        }
314    }
315
316    /// Overrides the graph store and write store used by the query engine.
317    ///
318    /// Used by the layered store integration: the session's `store` field is
319    /// the overlay `LpgStore` (for MVCC), but reads and writes should route
320    /// through the `LayeredStore` (which merges base + overlay).
321    #[cfg(all(feature = "compact-store", feature = "lpg"))]
322    pub(crate) fn override_stores(
323        &mut self,
324        read_store: Arc<dyn GraphStoreSearch>,
325        write_store: Option<Arc<dyn GraphStoreMut>>,
326    ) {
327        self.graph_store = read_store;
328        self.graph_store_mut = write_store;
329    }
330
331    /// Sets the WAL for this session (shared with the database).
332    ///
333    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
334    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
335    #[cfg(all(feature = "wal", feature = "lpg"))]
336    pub(crate) fn set_wal(
337        &mut self,
338        wal: Arc<grafeo_storage::wal::LpgWal>,
339        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
340    ) {
341        // Wrap the graph store so query-engine mutations are WAL-logged
342        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
343            Arc::clone(&self.store),
344            Arc::clone(&wal),
345            Arc::clone(&wal_graph_context),
346        ));
347        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStoreSearch>;
348        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
349        self.wal = Some(wal);
350        self.wal_graph_context = Some(wal_graph_context);
351    }
352
353    /// Logs a WAL record if WAL is enabled. No-op for in-memory sessions.
354    ///
355    /// Mirrors `GrafeoDB::log_wal`: WAL write failures are logged via
356    /// `grafeo_warn!` and not propagated, so a transient WAL error never
357    /// fails a Session mutation. The caller is responsible for invoking
358    /// this immediately after the in-memory LPG mutation so that recovery
359    /// replays records in the same order writes happened.
360    #[cfg(all(feature = "wal", feature = "lpg"))]
361    pub(crate) fn log_wal_record(&self, record: &grafeo_storage::wal::WalRecord) {
362        if let Some(ref wal) = self.wal
363            && let Err(e) = wal.log(record)
364        {
365            grafeo_warn!("Session: failed to log WAL record: {}", e);
366        }
367    }
368
369    /// Sets the CDC log for this session (shared with the database).
370    ///
371    /// Wraps the current write store with a `CdcGraphStore` decorator so
372    /// that all session mutations (INSERT, SET, DELETE via query execution)
373    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
374    /// and discarded on rollback.
375    #[cfg(feature = "cdc")]
376    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
377        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
378        // The read store (self.graph_store) is left unchanged for zero read overhead.
379        if let Some(ref write_store) = self.graph_store_mut {
380            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
381                Arc::clone(write_store),
382                Arc::clone(&cdc_log),
383            ));
384            self.cdc_pending_events = Some(cdc_store.pending_events());
385            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
386        }
387        self.cdc_log = cdc_log;
388    }
389
390    /// Sets the metrics registry for this session (shared with the database).
391    #[cfg(feature = "metrics")]
392    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
393        self.metrics = Some(metrics);
394    }
395
396    /// Creates a session backed by an external graph store.
397    ///
398    /// The external store handles all data operations. Transaction management
399    /// (begin/commit/rollback) is not supported for external stores.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the internal arena allocation fails (out of memory).
404    pub(crate) fn with_external_store(
405        read_store: Arc<dyn GraphStoreSearch>,
406        write_store: Option<Arc<dyn GraphStoreMut>>,
407        cfg: SessionConfig,
408    ) -> Result<Self> {
409        Ok(Self {
410            #[cfg(feature = "lpg")]
411            store: Arc::new(LpgStore::new()?),
412            #[cfg(feature = "lpg")]
413            lpg_backend: LpgBackend::Placeholder,
414            graph_store: read_store,
415            graph_store_mut: write_store,
416            catalog: cfg.catalog,
417            #[cfg(feature = "triple-store")]
418            rdf_store: Arc::new(RdfStore::new()),
419            transaction_manager: cfg.transaction_manager,
420            query_cache: cfg.query_cache,
421            current_transaction: parking_lot::Mutex::new(None),
422            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
423            db_read_only: cfg.read_only,
424            identity: cfg.identity,
425            auto_commit: true,
426            adaptive_config: cfg.adaptive_config,
427            factorized_execution: cfg.factorized_execution,
428            graph_model: cfg.graph_model,
429            query_timeout: cfg.query_timeout,
430            max_property_size: cfg.max_property_size,
431            buffer_manager: cfg.buffer_manager,
432            commit_counter: cfg.commit_counter,
433            gc_interval: cfg.gc_interval,
434            transaction_start_node_count: AtomicUsize::new(0),
435            transaction_start_edge_count: AtomicUsize::new(0),
436            #[cfg(feature = "wal")]
437            wal: None,
438            #[cfg(feature = "wal")]
439            wal_graph_context: None,
440            #[cfg(feature = "cdc")]
441            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
442            #[cfg(feature = "cdc")]
443            cdc_pending_events: None,
444            current_graph: parking_lot::Mutex::new(None),
445            current_schema: parking_lot::Mutex::new(None),
446            time_zone: parking_lot::Mutex::new(None),
447            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
448            viewing_epoch_override: parking_lot::Mutex::new(None),
449            savepoints: parking_lot::Mutex::new(Vec::new()),
450            transaction_nesting_depth: parking_lot::Mutex::new(0),
451            touched_graphs: parking_lot::Mutex::new(Vec::new()),
452            active_streams: AtomicUsize::new(0),
453            #[cfg(feature = "metrics")]
454            metrics: None,
455            #[cfg(feature = "metrics")]
456            tx_start_time: parking_lot::Mutex::new(None),
457            #[cfg(feature = "lpg")]
458            projections: cfg.projections,
459        })
460    }
461
462    /// Returns the graph model this session operates on.
463    #[must_use]
464    pub fn graph_model(&self) -> GraphModel {
465        self.graph_model
466    }
467
468    /// Returns the identity bound to this session.
469    #[must_use]
470    pub fn identity(&self) -> &crate::auth::Identity {
471        &self.identity
472    }
473
474    // === Session State Management ===
475
476    /// Sets the current graph for this session (USE GRAPH).
477    pub fn use_graph(&self, name: &str) {
478        *self.current_graph.lock() = Some(name.to_string());
479        self.track_graph_touch();
480    }
481
482    /// Returns the current graph name, if any.
483    #[must_use]
484    pub fn current_graph(&self) -> Option<String> {
485        self.current_graph.lock().clone()
486    }
487
488    /// Sets the current schema for this session (SESSION SET SCHEMA).
489    ///
490    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
491    pub fn set_schema(&self, name: &str) {
492        *self.current_schema.lock() = Some(name.to_string());
493        self.track_graph_touch();
494    }
495
496    /// Returns the current schema name, if any.
497    ///
498    /// `None` means "not set", which resolves to the default schema.
499    #[must_use]
500    pub fn current_schema(&self) -> Option<String> {
501        self.current_schema.lock().clone()
502    }
503
504    /// Computes the effective storage key for a graph, accounting for schema context.
505    ///
506    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
507    /// Uses `/` as separator since it is invalid in GQL identifiers.
508    fn effective_graph_key(&self, graph_name: &str) -> String {
509        let schema = self.current_schema.lock().clone();
510        match schema {
511            Some(s) => format!("{s}/{graph_name}"),
512            None => graph_name.to_string(),
513        }
514    }
515
516    /// Computes the effective storage key for a type, accounting for schema context.
517    ///
518    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
519    fn effective_type_key(&self, type_name: &str) -> String {
520        let schema = self.current_schema.lock().clone();
521        match schema {
522            Some(s) => format!("{s}/{type_name}"),
523            None => type_name.to_string(),
524        }
525    }
526
527    /// Returns the effective storage key for the current graph, accounting for schema.
528    ///
529    /// Combines `current_schema` and `current_graph` into a flat lookup key.
530    fn active_graph_storage_key(&self) -> Option<String> {
531        let graph = self.current_graph.lock().clone();
532        let schema = self.current_schema.lock().clone();
533        match (&schema, &graph) {
534            (None, None) => None,
535            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
536            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
537            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
538                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
539            }
540            (None, Some(name)) => Some(name.clone()),
541            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
542        }
543    }
544
545    /// Returns the graph store for the currently active graph.
546    ///
547    /// If `current_graph` is `None` or `"default"`, returns the session's
548    /// default `graph_store` (already WAL-wrapped for the default graph).
549    /// Otherwise looks up the named graph in the root store and wraps it
550    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
551    /// graph context.
552    fn active_store(&self) -> Arc<dyn GraphStoreSearch> {
553        let key = self.active_graph_storage_key();
554        match key {
555            None => Arc::clone(&self.graph_store),
556            #[cfg(feature = "lpg")]
557            Some(ref name) => match self.store.graph(name) {
558                Some(named_store) => {
559                    #[cfg(feature = "wal")]
560                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
561                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
562                            named_store,
563                            Arc::clone(wal),
564                            name.clone(),
565                            Arc::clone(ctx),
566                        )) as Arc<dyn GraphStoreSearch>;
567                    }
568                    named_store as Arc<dyn GraphStoreSearch>
569                }
570                None => Arc::clone(&self.graph_store),
571            },
572            #[cfg(not(feature = "lpg"))]
573            Some(_) => Arc::clone(&self.graph_store),
574        }
575    }
576
577    /// Returns the writable store for the active graph, if available.
578    ///
579    /// Returns `None` for read-only databases. For named graphs, wraps
580    /// the store with WAL logging when durability is enabled.
581    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
582        let key = self.active_graph_storage_key();
583        match key {
584            None => self.graph_store_mut.as_ref().map(Arc::clone),
585            #[cfg(feature = "lpg")]
586            Some(ref name) => match self.store.graph(name) {
587                Some(named_store) => {
588                    let mut store: Arc<dyn GraphStoreMut> = named_store;
589
590                    #[cfg(feature = "wal")]
591                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
592                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
593                            // WAL needs Arc<LpgStore>, get it fresh
594                            self.store
595                                .graph(name)
596                                .unwrap_or_else(|| Arc::clone(&self.store)),
597                            Arc::clone(wal),
598                            name.clone(),
599                            Arc::clone(ctx),
600                        ));
601                    }
602
603                    #[cfg(feature = "cdc")]
604                    if let Some(ref pending) = self.cdc_pending_events {
605                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
606                            store,
607                            Arc::clone(&self.cdc_log),
608                            Arc::clone(pending),
609                        ));
610                    }
611
612                    Some(store)
613                }
614                None => self.graph_store_mut.as_ref().map(Arc::clone),
615            },
616            #[cfg(not(feature = "lpg"))]
617            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
618        }
619    }
620
621    /// Returns the concrete `LpgStore` for the currently active graph.
622    ///
623    /// Used by direct CRUD methods that need the concrete store type
624    /// for versioned operations.
625    #[cfg(feature = "lpg")]
626    fn active_lpg_store(&self) -> Arc<LpgStore> {
627        let key = self.active_graph_storage_key();
628        match key {
629            None => Arc::clone(&self.store),
630            Some(ref name) => self
631                .store
632                .graph(name)
633                .unwrap_or_else(|| Arc::clone(&self.store)),
634        }
635    }
636
637    /// Resolves a graph name to a concrete `LpgStore`.
638    /// `None` and `"default"` resolve to the session's root store.
639    #[cfg(feature = "lpg")]
640    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
641        match graph_name {
642            None => Arc::clone(&self.store),
643            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
644            Some(name) => self
645                .store
646                .graph(name)
647                .unwrap_or_else(|| Arc::clone(&self.store)),
648        }
649    }
650
651    /// Records the current graph as "touched" if a transaction is active.
652    ///
653    /// Uses the full storage key (schema/graph) so that commit/rollback
654    /// can resolve the correct store via `resolve_store`. Called from
655    /// every setter that can change the active key (`use_graph`,
656    /// `set_schema`, `reset_*`) so mid-transaction context switches are
657    /// always captured; callers that mutate the active key via those
658    /// setters do not need to invoke this directly.
659    fn track_graph_touch(&self) {
660        if self.current_transaction.lock().is_some() {
661            let key = self.active_graph_storage_key();
662            let mut touched = self.touched_graphs.lock();
663            if !touched.contains(&key) {
664                touched.push(key);
665            }
666        }
667    }
668
669    /// Sets the session time zone.
670    pub fn set_time_zone(&self, tz: &str) {
671        *self.time_zone.lock() = Some(tz.to_string());
672    }
673
674    /// Returns the session time zone, if set.
675    #[must_use]
676    pub fn time_zone(&self) -> Option<String> {
677        self.time_zone.lock().clone()
678    }
679
680    /// Sets a session parameter.
681    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
682        self.session_params.lock().insert(key.to_string(), value);
683    }
684
685    /// Gets a session parameter by cloning it.
686    #[must_use]
687    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
688        self.session_params.lock().get(key).cloned()
689    }
690
691    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
692    pub fn reset_session(&self) {
693        *self.current_schema.lock() = None;
694        *self.current_graph.lock() = None;
695        *self.time_zone.lock() = None;
696        self.session_params.lock().clear();
697        *self.viewing_epoch_override.lock() = None;
698        self.track_graph_touch();
699    }
700
701    /// Resets only the session schema (Section 7.2 GR1).
702    pub fn reset_schema(&self) {
703        *self.current_schema.lock() = None;
704        self.track_graph_touch();
705    }
706
707    /// Resets only the session graph (Section 7.2 GR2).
708    pub fn reset_graph(&self) {
709        *self.current_graph.lock() = None;
710        self.track_graph_touch();
711    }
712
713    /// Resets only the session time zone (Section 7.2 GR3).
714    pub fn reset_time_zone(&self) {
715        *self.time_zone.lock() = None;
716    }
717
718    /// Resets only session parameters (Section 7.2 GR4).
719    pub fn reset_parameters(&self) {
720        self.session_params.lock().clear();
721    }
722
723    // --- Time-travel API ---
724
725    /// Sets a viewing epoch override for time-travel queries.
726    ///
727    /// While set, all queries on this session see the database as it existed
728    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
729    /// to return to normal behavior.
730    pub fn set_viewing_epoch(&self, epoch: EpochId) {
731        *self.viewing_epoch_override.lock() = Some(epoch);
732    }
733
734    /// Clears the viewing epoch override, returning to normal behavior.
735    pub fn clear_viewing_epoch(&self) {
736        *self.viewing_epoch_override.lock() = None;
737    }
738
739    /// Returns the current viewing epoch override, if any.
740    #[must_use]
741    pub fn viewing_epoch(&self) -> Option<EpochId> {
742        *self.viewing_epoch_override.lock()
743    }
744
745    /// Returns all versions of a node with their creation/deletion epochs.
746    ///
747    /// Properties and labels reflect the current state (not versioned per-epoch).
748    #[cfg(feature = "lpg")]
749    #[must_use]
750    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
751        self.active_lpg_store().get_node_history(id)
752    }
753
754    /// Returns all versions of an edge with their creation/deletion epochs.
755    ///
756    /// Properties reflect the current state (not versioned per-epoch).
757    #[cfg(feature = "lpg")]
758    #[must_use]
759    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
760        self.active_lpg_store().get_edge_history(id)
761    }
762
763    /// Checks that the session's graph model supports LPG operations.
764    fn require_lpg(&self, language: &str) -> Result<()> {
765        if self.graph_model == GraphModel::Rdf {
766            return Err(grafeo_common::utils::error::Error::Internal(format!(
767                "This is an RDF database. {language} queries require an LPG database."
768            )));
769        }
770        Ok(())
771    }
772
773    /// Checks that the session's identity is permitted to execute the given
774    /// statement kind. Returns an error if the role is insufficient.
775    ///
776    /// Short-circuits for Admin identities (the common case for embedded use
777    /// and benchmarks) to avoid any overhead on the hot path.
778    #[inline]
779    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
780        // Fast path: Admin can do everything
781        if self.identity.can_admin() {
782            return Ok(());
783        }
784        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
785            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
786                grafeo_common::utils::error::QueryErrorKind::Semantic,
787                denied.to_string(),
788            ))
789        })
790    }
791
792    /// Executes a session or transaction command, returning an empty result.
793    #[cfg(feature = "gql")]
794    fn execute_session_command(
795        &self,
796        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
797    ) -> Result<QueryResult> {
798        use grafeo_adapters::query::gql::ast::SessionCommand;
799        #[cfg(feature = "lpg")]
800        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
801        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
802
803        // Check role-based permission for graph management commands
804        match &cmd {
805            SessionCommand::CreateGraph { .. }
806            | SessionCommand::DropGraph { .. }
807            | SessionCommand::CreateProjection { .. }
808            | SessionCommand::DropProjection { .. } => {
809                self.require_permission(crate::auth::StatementKind::Write)?;
810            }
811            _ => {} // Session state + transaction control: always allowed
812        }
813
814        // Check per-graph grants for graph-scoped commands
815        if self.identity.has_grants() {
816            match &cmd {
817                SessionCommand::CreateGraph { name, .. }
818                | SessionCommand::DropGraph { name, .. }
819                    if !self
820                        .identity
821                        .can_access_graph(name, crate::auth::Role::ReadWrite) =>
822                {
823                    return Err(Error::Query(QueryError::new(
824                        QueryErrorKind::Semantic,
825                        format!(
826                            "permission denied: no grant for graph '{name}' (user: {})",
827                            self.identity.user_id()
828                        ),
829                    )));
830                }
831                _ => {}
832            }
833        }
834
835        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
836        if *self.read_only_tx.lock() {
837            match &cmd {
838                SessionCommand::CreateGraph { .. }
839                | SessionCommand::DropGraph { .. }
840                | SessionCommand::CreateProjection { .. }
841                | SessionCommand::DropProjection { .. } => {
842                    return Err(Error::Transaction(
843                        grafeo_common::utils::error::TransactionError::ReadOnly,
844                    ));
845                }
846                _ => {} // Session state + transaction control allowed
847            }
848        }
849
850        match cmd {
851            #[cfg(feature = "lpg")]
852            SessionCommand::CreateGraph {
853                name,
854                if_not_exists,
855                typed,
856                like_graph,
857                copy_of,
858                open: _,
859            } => {
860                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
861                if name.contains('/') {
862                    return Err(Error::Query(QueryError::new(
863                        QueryErrorKind::Semantic,
864                        format!(
865                            "Graph name '{name}' must not contain '/' (reserved as schema/graph separator)"
866                        ),
867                    )));
868                }
869                let storage_key = self.effective_graph_key(&name);
870
871                // Validate source graph exists for LIKE / AS COPY OF
872                if let Some(ref src) = like_graph {
873                    let src_key = self.effective_graph_key(src);
874                    if self.store.graph(&src_key).is_none() {
875                        return Err(Error::Query(QueryError::new(
876                            QueryErrorKind::Semantic,
877                            format!("Source graph '{src}' does not exist"),
878                        )));
879                    }
880                }
881                if let Some(ref src) = copy_of {
882                    let src_key = self.effective_graph_key(src);
883                    if self.store.graph(&src_key).is_none() {
884                        return Err(Error::Query(QueryError::new(
885                            QueryErrorKind::Semantic,
886                            format!("Source graph '{src}' does not exist"),
887                        )));
888                    }
889                }
890
891                let created = self
892                    .store
893                    .create_graph(&storage_key)
894                    .map_err(|e| Error::Internal(e.to_string()))?;
895                if !created && !if_not_exists {
896                    return Err(Error::Query(QueryError::new(
897                        QueryErrorKind::Semantic,
898                        format!("Graph '{name}' already exists"),
899                    )));
900                }
901                if created {
902                    #[cfg(feature = "wal")]
903                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
904                        name: storage_key.clone(),
905                    });
906                }
907
908                // AS COPY OF: copy data from source graph
909                if let Some(ref src) = copy_of {
910                    let src_key = self.effective_graph_key(src);
911                    self.store
912                        .copy_graph(Some(&src_key), Some(&storage_key))
913                        .map_err(|e| Error::Internal(e.to_string()))?;
914                }
915
916                // Bind to graph type if specified.
917                // If the parser produced a '/' in the name it is already a qualified
918                // "schema/type" key; otherwise resolve against the current schema.
919                if let Some(type_name) = typed
920                    && let Err(e) = self.catalog.bind_graph_type(
921                        &storage_key,
922                        if type_name.contains('/') {
923                            type_name.clone()
924                        } else {
925                            self.effective_type_key(&type_name)
926                        },
927                    )
928                {
929                    return Err(Error::Query(QueryError::new(
930                        QueryErrorKind::Semantic,
931                        e.to_string(),
932                    )));
933                }
934
935                // LIKE: copy graph type binding from source
936                if let Some(ref src) = like_graph {
937                    let src_key = self.effective_graph_key(src);
938                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
939                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
940                    }
941                }
942
943                Ok(QueryResult::empty())
944            }
945            #[cfg(feature = "lpg")]
946            SessionCommand::DropGraph { name, if_exists } => {
947                let storage_key = self.effective_graph_key(&name);
948                let dropped = self.store.drop_graph(&storage_key);
949                if !dropped && !if_exists {
950                    return Err(Error::Query(QueryError::new(
951                        QueryErrorKind::Semantic,
952                        format!("Graph '{name}' does not exist"),
953                    )));
954                }
955                if dropped {
956                    #[cfg(feature = "wal")]
957                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
958                        name: storage_key.clone(),
959                    });
960                    // If this session was using the dropped graph, reset to default
961                    let mut current = self.current_graph.lock();
962                    if current
963                        .as_deref()
964                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
965                    {
966                        *current = None;
967                    }
968                }
969                Ok(QueryResult::empty())
970            }
971            #[cfg(feature = "lpg")]
972            SessionCommand::UseGraph(name) => {
973                // Check per-graph grant before switching
974                if self.identity.has_grants()
975                    && !name.eq_ignore_ascii_case("default")
976                    && !self
977                        .identity
978                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
979                {
980                    return Err(Error::Query(QueryError::new(
981                        QueryErrorKind::Semantic,
982                        format!(
983                            "permission denied: no grant for graph '{name}' (user: {})",
984                            self.identity.user_id()
985                        ),
986                    )));
987                }
988                // Verify graph exists (resolve within current schema)
989                let effective_key = self.effective_graph_key(&name);
990                if !name.eq_ignore_ascii_case("default")
991                    && self.store.graph(&effective_key).is_none()
992                {
993                    return Err(Error::Query(QueryError::new(
994                        QueryErrorKind::Semantic,
995                        format!("Graph '{name}' does not exist"),
996                    )));
997                }
998                self.use_graph(&name);
999                Ok(QueryResult::empty())
1000            }
1001            #[cfg(feature = "lpg")]
1002            SessionCommand::SessionSetGraph(name) => {
1003                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
1004                // Check per-graph grant before switching (same as USE GRAPH)
1005                if self.identity.has_grants()
1006                    && !name.eq_ignore_ascii_case("default")
1007                    && !self
1008                        .identity
1009                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
1010                {
1011                    return Err(Error::Query(QueryError::new(
1012                        QueryErrorKind::Semantic,
1013                        format!(
1014                            "permission denied: no grant for graph '{name}' (user: {})",
1015                            self.identity.user_id()
1016                        ),
1017                    )));
1018                }
1019                let effective_key = self.effective_graph_key(&name);
1020                if !name.eq_ignore_ascii_case("default")
1021                    && self.store.graph(&effective_key).is_none()
1022                {
1023                    return Err(Error::Query(QueryError::new(
1024                        QueryErrorKind::Semantic,
1025                        format!("Graph '{name}' does not exist"),
1026                    )));
1027                }
1028                self.use_graph(&name);
1029                Ok(QueryResult::empty())
1030            }
1031            SessionCommand::SessionSetSchema(name) => {
1032                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
1033                if !self.catalog.schema_exists(&name) {
1034                    return Err(Error::Query(QueryError::new(
1035                        QueryErrorKind::Semantic,
1036                        format!("Schema '{name}' does not exist"),
1037                    )));
1038                }
1039                self.set_schema(&name);
1040                Ok(QueryResult::empty())
1041            }
1042            SessionCommand::SessionSetTimeZone(tz) => {
1043                self.set_time_zone(&tz);
1044                Ok(QueryResult::empty())
1045            }
1046            #[cfg(feature = "gql")]
1047            SessionCommand::SessionSetParameter(key, expr) => {
1048                if key.eq_ignore_ascii_case("viewing_epoch") {
1049                    match Self::eval_integer_literal(&expr) {
1050                        Some(n) if n >= 0 => {
1051                            // reason: guard ensures n >= 0
1052                            #[allow(clippy::cast_sign_loss)]
1053                            let epoch = n as u64;
1054                            self.set_viewing_epoch(EpochId::new(epoch));
1055                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
1056                        }
1057                        _ => Err(Error::Query(QueryError::new(
1058                            QueryErrorKind::Semantic,
1059                            "viewing_epoch must be a non-negative integer literal",
1060                        ))),
1061                    }
1062                } else {
1063                    // For now, store parameter name with Null value.
1064                    // Full expression evaluation would require building and executing a plan.
1065                    self.set_parameter(&key, Value::Null);
1066                    Ok(QueryResult::empty())
1067                }
1068            }
1069            SessionCommand::SessionReset(target) => {
1070                use grafeo_adapters::query::gql::ast::SessionResetTarget;
1071                match target {
1072                    SessionResetTarget::All => self.reset_session(),
1073                    SessionResetTarget::Schema => self.reset_schema(),
1074                    SessionResetTarget::Graph => self.reset_graph(),
1075                    SessionResetTarget::TimeZone => self.reset_time_zone(),
1076                    SessionResetTarget::Parameters => self.reset_parameters(),
1077                }
1078                Ok(QueryResult::empty())
1079            }
1080            SessionCommand::SessionClose => {
1081                self.reset_session();
1082                Ok(QueryResult::empty())
1083            }
1084            #[cfg(feature = "lpg")]
1085            SessionCommand::StartTransaction {
1086                read_only,
1087                isolation_level,
1088            } => {
1089                let engine_level = isolation_level.map(|l| match l {
1090                    TransactionIsolationLevel::ReadCommitted => {
1091                        crate::transaction::IsolationLevel::ReadCommitted
1092                    }
1093                    TransactionIsolationLevel::SnapshotIsolation => {
1094                        crate::transaction::IsolationLevel::SnapshotIsolation
1095                    }
1096                    TransactionIsolationLevel::Serializable => {
1097                        crate::transaction::IsolationLevel::Serializable
1098                    }
1099                });
1100                self.begin_transaction_inner(read_only, engine_level)?;
1101                Ok(QueryResult::status("Transaction started"))
1102            }
1103            #[cfg(feature = "lpg")]
1104            SessionCommand::Commit => {
1105                self.commit_inner()?;
1106                Ok(QueryResult::status("Transaction committed"))
1107            }
1108            #[cfg(feature = "lpg")]
1109            SessionCommand::Rollback => {
1110                self.rollback_inner()?;
1111                Ok(QueryResult::status("Transaction rolled back"))
1112            }
1113            #[cfg(feature = "lpg")]
1114            SessionCommand::Savepoint(name) => {
1115                self.savepoint(&name)?;
1116                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1117            }
1118            #[cfg(feature = "lpg")]
1119            SessionCommand::RollbackToSavepoint(name) => {
1120                self.rollback_to_savepoint(&name)?;
1121                Ok(QueryResult::status(format!(
1122                    "Rolled back to savepoint '{name}'"
1123                )))
1124            }
1125            #[cfg(feature = "lpg")]
1126            SessionCommand::ReleaseSavepoint(name) => {
1127                self.release_savepoint(&name)?;
1128                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1129            }
1130            #[cfg(feature = "lpg")]
1131            SessionCommand::CreateProjection {
1132                name,
1133                node_labels,
1134                edge_types,
1135            } => {
1136                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1137                use std::collections::hash_map::Entry;
1138
1139                let spec = ProjectionSpec::new()
1140                    .with_node_labels(node_labels)
1141                    .with_edge_types(edge_types);
1142
1143                let store = self.active_store();
1144                let projection = Arc::new(GraphProjection::new(store, spec));
1145                let mut projections = self.projections.write();
1146                match projections.entry(name.clone()) {
1147                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1148                        QueryErrorKind::Semantic,
1149                        format!("Projection '{name}' already exists"),
1150                    ))),
1151                    Entry::Vacant(e) => {
1152                        e.insert(projection);
1153                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1154                    }
1155                }
1156            }
1157            #[cfg(feature = "lpg")]
1158            SessionCommand::DropProjection { name } => {
1159                let removed = self.projections.write().remove(&name).is_some();
1160                if !removed {
1161                    return Err(Error::Query(QueryError::new(
1162                        QueryErrorKind::Semantic,
1163                        format!("Projection '{name}' does not exist"),
1164                    )));
1165                }
1166                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1167            }
1168            #[cfg(feature = "lpg")]
1169            SessionCommand::ShowProjections => {
1170                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1171                names.sort();
1172                let rows: Vec<Vec<Value>> =
1173                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1174                Ok(QueryResult {
1175                    columns: vec!["name".to_string()],
1176                    column_types: Vec::new(),
1177                    rows,
1178                    ..QueryResult::empty()
1179                })
1180            }
1181            #[cfg(not(feature = "lpg"))]
1182            _ => Err(grafeo_common::utils::error::Error::Internal(
1183                "This command requires the `lpg` feature".to_string(),
1184            )),
1185        }
1186    }
1187
1188    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1189    #[cfg(feature = "wal")]
1190    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1191        if let Some(ref wal) = self.wal
1192            && let Err(e) = wal.log(record)
1193        {
1194            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1195        }
1196    }
1197
1198    /// Executes a schema DDL command, returning a status result.
1199    #[cfg(all(feature = "lpg", feature = "gql"))]
1200    fn execute_schema_command(
1201        &self,
1202        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1203    ) -> Result<QueryResult> {
1204        use crate::catalog::{
1205            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1206        };
1207        use grafeo_adapters::query::gql::ast::SchemaStatement;
1208        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1209        #[cfg(feature = "wal")]
1210        use grafeo_storage::wal::WalRecord;
1211
1212        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1213        macro_rules! wal_log {
1214            ($self:expr, $record:expr) => {
1215                #[cfg(feature = "wal")]
1216                $self.log_schema_wal(&$record);
1217            };
1218        }
1219
1220        let result = match cmd {
1221            SchemaStatement::CreateNodeType(stmt) => {
1222                let effective_name = self.effective_type_key(&stmt.name);
1223                #[cfg(feature = "wal")]
1224                let props_for_wal: Vec<(String, String, bool)> = stmt
1225                    .properties
1226                    .iter()
1227                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1228                    .collect();
1229                let def = NodeTypeDefinition {
1230                    name: effective_name.clone(),
1231                    properties: stmt
1232                        .properties
1233                        .iter()
1234                        .map(|p| TypedProperty {
1235                            name: p.name.clone(),
1236                            data_type: PropertyDataType::from_type_name(&p.data_type),
1237                            nullable: p.nullable,
1238                            default_value: p
1239                                .default_value
1240                                .as_ref()
1241                                .map(|s| parse_default_literal(s)),
1242                        })
1243                        .collect(),
1244                    constraints: Vec::new(),
1245                    parent_types: stmt.parent_types.clone(),
1246                };
1247                let result = if stmt.or_replace {
1248                    let _ = self.catalog.drop_node_type(&effective_name);
1249                    self.catalog.register_node_type(def)
1250                } else {
1251                    self.catalog.register_node_type(def)
1252                };
1253                match result {
1254                    Ok(()) => {
1255                        wal_log!(
1256                            self,
1257                            WalRecord::CreateNodeType {
1258                                name: effective_name.clone(),
1259                                properties: props_for_wal,
1260                                constraints: Vec::new(),
1261                            }
1262                        );
1263                        Ok(QueryResult::status(format!(
1264                            "Created node type '{}'",
1265                            stmt.name
1266                        )))
1267                    }
1268                    Err(e) if stmt.if_not_exists => {
1269                        let _ = e;
1270                        Ok(QueryResult::status("No change"))
1271                    }
1272                    Err(e) => Err(Error::Query(QueryError::new(
1273                        QueryErrorKind::Semantic,
1274                        e.to_string(),
1275                    ))),
1276                }
1277            }
1278            SchemaStatement::CreateEdgeType(stmt) => {
1279                let effective_name = self.effective_type_key(&stmt.name);
1280                #[cfg(feature = "wal")]
1281                let props_for_wal: Vec<(String, String, bool)> = stmt
1282                    .properties
1283                    .iter()
1284                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1285                    .collect();
1286                let def = EdgeTypeDefinition {
1287                    name: effective_name.clone(),
1288                    properties: stmt
1289                        .properties
1290                        .iter()
1291                        .map(|p| TypedProperty {
1292                            name: p.name.clone(),
1293                            data_type: PropertyDataType::from_type_name(&p.data_type),
1294                            nullable: p.nullable,
1295                            default_value: p
1296                                .default_value
1297                                .as_ref()
1298                                .map(|s| parse_default_literal(s)),
1299                        })
1300                        .collect(),
1301                    constraints: Vec::new(),
1302                    source_node_types: stmt.source_node_types.clone(),
1303                    target_node_types: stmt.target_node_types.clone(),
1304                };
1305                let result = if stmt.or_replace {
1306                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1307                    self.catalog.register_edge_type_def(def)
1308                } else {
1309                    self.catalog.register_edge_type_def(def)
1310                };
1311                match result {
1312                    Ok(()) => {
1313                        wal_log!(
1314                            self,
1315                            WalRecord::CreateEdgeType {
1316                                name: effective_name.clone(),
1317                                properties: props_for_wal,
1318                                constraints: Vec::new(),
1319                            }
1320                        );
1321                        Ok(QueryResult::status(format!(
1322                            "Created edge type '{}'",
1323                            stmt.name
1324                        )))
1325                    }
1326                    Err(e) if stmt.if_not_exists => {
1327                        let _ = e;
1328                        Ok(QueryResult::status("No change"))
1329                    }
1330                    Err(e) => Err(Error::Query(QueryError::new(
1331                        QueryErrorKind::Semantic,
1332                        e.to_string(),
1333                    ))),
1334                }
1335            }
1336            SchemaStatement::CreateVectorIndex(stmt) => {
1337                Self::create_vector_index_on_store(
1338                    &self.active_lpg_store(),
1339                    &stmt.node_label,
1340                    &stmt.property,
1341                    stmt.dimensions,
1342                    stmt.metric.as_deref(),
1343                )?;
1344                wal_log!(
1345                    self,
1346                    WalRecord::CreateIndex {
1347                        name: stmt.name.clone(),
1348                        label: stmt.node_label.clone(),
1349                        property: stmt.property.clone(),
1350                        index_type: "vector".to_string(),
1351                    }
1352                );
1353                Ok(QueryResult::status(format!(
1354                    "Created vector index '{}'",
1355                    stmt.name
1356                )))
1357            }
1358            SchemaStatement::DropNodeType { name, if_exists } => {
1359                let effective_name = self.effective_type_key(&name);
1360                match self.catalog.drop_node_type(&effective_name) {
1361                    Ok(()) => {
1362                        wal_log!(
1363                            self,
1364                            WalRecord::DropNodeType {
1365                                name: effective_name
1366                            }
1367                        );
1368                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1369                    }
1370                    Err(e) if if_exists => {
1371                        let _ = e;
1372                        Ok(QueryResult::status("No change"))
1373                    }
1374                    Err(e) => Err(Error::Query(QueryError::new(
1375                        QueryErrorKind::Semantic,
1376                        e.to_string(),
1377                    ))),
1378                }
1379            }
1380            SchemaStatement::DropEdgeType { name, if_exists } => {
1381                let effective_name = self.effective_type_key(&name);
1382                match self.catalog.drop_edge_type_def(&effective_name) {
1383                    Ok(()) => {
1384                        wal_log!(
1385                            self,
1386                            WalRecord::DropEdgeType {
1387                                name: effective_name
1388                            }
1389                        );
1390                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1391                    }
1392                    Err(e) if if_exists => {
1393                        let _ = e;
1394                        Ok(QueryResult::status("No change"))
1395                    }
1396                    Err(e) => Err(Error::Query(QueryError::new(
1397                        QueryErrorKind::Semantic,
1398                        e.to_string(),
1399                    ))),
1400                }
1401            }
1402            SchemaStatement::CreateIndex(stmt) => {
1403                use crate::catalog::IndexType as CatalogIndexType;
1404                use grafeo_adapters::query::gql::ast::IndexKind;
1405                let active = self.active_lpg_store();
1406                let index_type_str = match stmt.index_kind {
1407                    IndexKind::Property => "property",
1408                    IndexKind::BTree => "btree",
1409                    IndexKind::Text => "text",
1410                    IndexKind::Vector => "vector",
1411                };
1412                match stmt.index_kind {
1413                    IndexKind::Property | IndexKind::BTree => {
1414                        for prop in &stmt.properties {
1415                            active.create_property_index(prop);
1416                        }
1417                    }
1418                    IndexKind::Text => {
1419                        for prop in &stmt.properties {
1420                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1421                        }
1422                    }
1423                    IndexKind::Vector => {
1424                        for prop in &stmt.properties {
1425                            Self::create_vector_index_on_store(
1426                                &active,
1427                                &stmt.label,
1428                                prop,
1429                                stmt.options.dimensions,
1430                                stmt.options.metric.as_deref(),
1431                            )?;
1432                        }
1433                    }
1434                }
1435                // Register each property index in the catalog for SHOW INDEXES
1436                // and DROP INDEX name-based lookup.
1437                let catalog_index_type = match stmt.index_kind {
1438                    IndexKind::Property => CatalogIndexType::Hash,
1439                    IndexKind::BTree => CatalogIndexType::BTree,
1440                    IndexKind::Text => CatalogIndexType::FullText,
1441                    IndexKind::Vector => CatalogIndexType::Hash,
1442                };
1443                let label_id = self.catalog.get_or_create_label(&stmt.label);
1444                for prop in &stmt.properties {
1445                    let prop_id = self.catalog.get_or_create_property_key(prop);
1446                    self.catalog
1447                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1448                }
1449                #[cfg(feature = "wal")]
1450                for prop in &stmt.properties {
1451                    wal_log!(
1452                        self,
1453                        WalRecord::CreateIndex {
1454                            name: stmt.name.clone(),
1455                            label: stmt.label.clone(),
1456                            property: prop.clone(),
1457                            index_type: index_type_str.to_string(),
1458                        }
1459                    );
1460                }
1461                Ok(QueryResult::status(format!(
1462                    "Created {} index '{}'",
1463                    index_type_str, stmt.name
1464                )))
1465            }
1466            SchemaStatement::DropIndex { name, if_exists } => {
1467                // Look up the index by name in the catalog to find the
1468                // underlying property, then drop from both catalog and store.
1469                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1470                    let def = self.catalog.get_index(index_id);
1471                    self.catalog.drop_index(index_id);
1472                    if let Some(def) = def
1473                        && let Some(prop_name) =
1474                            self.catalog.get_property_key_name(def.property_key)
1475                    {
1476                        self.active_lpg_store().drop_property_index(&prop_name);
1477                    }
1478                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1479                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1480                } else if if_exists {
1481                    Ok(QueryResult::status("No change".to_string()))
1482                } else {
1483                    Err(Error::Query(QueryError::new(
1484                        QueryErrorKind::Semantic,
1485                        format!("Index '{name}' does not exist"),
1486                    )))
1487                }
1488            }
1489            SchemaStatement::CreateConstraint(stmt) => {
1490                use crate::catalog::TypeConstraint;
1491                use grafeo_adapters::query::gql::ast::ConstraintKind;
1492                let kind_str = match stmt.constraint_kind {
1493                    ConstraintKind::Unique => "unique",
1494                    ConstraintKind::NodeKey => "node_key",
1495                    ConstraintKind::NotNull => "not_null",
1496                    ConstraintKind::Exists => "exists",
1497                };
1498                let constraint_name = stmt
1499                    .name
1500                    .clone()
1501                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1502
1503                // Register constraint in catalog type definitions
1504                match stmt.constraint_kind {
1505                    ConstraintKind::Unique => {
1506                        for prop in &stmt.properties {
1507                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1508                            let prop_id = self.catalog.get_or_create_property_key(prop);
1509                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1510                        }
1511                        let _ = self.catalog.add_constraint_to_type(
1512                            &stmt.label,
1513                            TypeConstraint::Unique(stmt.properties.clone()),
1514                        );
1515                    }
1516                    ConstraintKind::NodeKey => {
1517                        for prop in &stmt.properties {
1518                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1519                            let prop_id = self.catalog.get_or_create_property_key(prop);
1520                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1521                            let _ = self.catalog.add_required_property(label_id, prop_id);
1522                        }
1523                        let _ = self.catalog.add_constraint_to_type(
1524                            &stmt.label,
1525                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1526                        );
1527                    }
1528                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1529                        for prop in &stmt.properties {
1530                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1531                            let prop_id = self.catalog.get_or_create_property_key(prop);
1532                            let _ = self.catalog.add_required_property(label_id, prop_id);
1533                            let _ = self.catalog.add_constraint_to_type(
1534                                &stmt.label,
1535                                TypeConstraint::NotNull(prop.clone()),
1536                            );
1537                        }
1538                    }
1539                }
1540
1541                wal_log!(
1542                    self,
1543                    WalRecord::CreateConstraint {
1544                        name: constraint_name.clone(),
1545                        label: stmt.label.clone(),
1546                        properties: stmt.properties.clone(),
1547                        kind: kind_str.to_string(),
1548                    }
1549                );
1550                Ok(QueryResult::status(format!(
1551                    "Created {kind_str} constraint '{constraint_name}'"
1552                )))
1553            }
1554            SchemaStatement::DropConstraint { name, if_exists } => {
1555                let _ = if_exists;
1556                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1557                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1558            }
1559            SchemaStatement::CreateGraphType(stmt) => {
1560                use crate::catalog::GraphTypeDefinition;
1561                use grafeo_adapters::query::gql::ast::InlineElementType;
1562
1563                let effective_name = self.effective_type_key(&stmt.name);
1564
1565                // GG04: LIKE clause copies type from existing graph
1566                let (mut node_types, mut edge_types, open) =
1567                    if let Some(ref like_graph) = stmt.like_graph {
1568                        // Infer types from the graph's bound type, or use its existing types
1569                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1570                            if let Some(existing) = self
1571                                .catalog
1572                                .schema()
1573                                .and_then(|s| s.get_graph_type(&type_name))
1574                            {
1575                                (
1576                                    existing.allowed_node_types.clone(),
1577                                    existing.allowed_edge_types.clone(),
1578                                    existing.open,
1579                                )
1580                            } else {
1581                                (Vec::new(), Vec::new(), true)
1582                            }
1583                        } else {
1584                            // GG22: Infer from graph data (labels used in graph)
1585                            let nt = self.catalog.all_node_type_names();
1586                            let et = self.catalog.all_edge_type_names();
1587                            if nt.is_empty() && et.is_empty() {
1588                                (Vec::new(), Vec::new(), true)
1589                            } else {
1590                                (nt, et, false)
1591                            }
1592                        }
1593                    } else {
1594                        // Prefix element type names with schema for consistency
1595                        let nt = stmt
1596                            .node_types
1597                            .iter()
1598                            .map(|n| self.effective_type_key(n))
1599                            .collect();
1600                        let et = stmt
1601                            .edge_types
1602                            .iter()
1603                            .map(|n| self.effective_type_key(n))
1604                            .collect();
1605                        (nt, et, stmt.open)
1606                    };
1607
1608                // GG03: Process inline element type entries. Per ISO/IEC 39075,
1609                // a bare `NODE TYPE Name` or `EDGE TYPE Name` inside a graph
1610                // type body is a reference; anything with a property block or
1611                // a KEY clause is an inline declaration. See issue #316.
1612                for inline in &stmt.inline_types {
1613                    match inline {
1614                        InlineElementType::Node {
1615                            name,
1616                            properties,
1617                            key_labels,
1618                            is_reference,
1619                            ..
1620                        } => {
1621                            let inline_effective = self.effective_type_key(name);
1622                            if *is_reference {
1623                                // Reference: validate existence; do not register, do not WAL.
1624                                if self.catalog.get_node_type(&inline_effective).is_none() {
1625                                    return Err(Error::Query(QueryError::new(
1626                                        QueryErrorKind::Semantic,
1627                                        format!(
1628                                            "Referenced node type '{inline_effective}' does not exist"
1629                                        ),
1630                                    )));
1631                                }
1632                            } else {
1633                                let def = NodeTypeDefinition {
1634                                    name: inline_effective.clone(),
1635                                    properties: properties
1636                                        .iter()
1637                                        .map(|p| TypedProperty {
1638                                            name: p.name.clone(),
1639                                            data_type: PropertyDataType::from_type_name(
1640                                                &p.data_type,
1641                                            ),
1642                                            nullable: p.nullable,
1643                                            default_value: None,
1644                                        })
1645                                        .collect(),
1646                                    constraints: Vec::new(),
1647                                    parent_types: key_labels.clone(),
1648                                };
1649                                self.catalog.register_or_replace_node_type(def);
1650                                #[cfg(feature = "wal")]
1651                                {
1652                                    let props_for_wal: Vec<(String, String, bool)> = properties
1653                                        .iter()
1654                                        .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1655                                        .collect();
1656                                    self.log_schema_wal(&WalRecord::CreateNodeType {
1657                                        name: inline_effective.clone(),
1658                                        properties: props_for_wal,
1659                                        constraints: Vec::new(),
1660                                    });
1661                                }
1662                            }
1663                            if !node_types.contains(&inline_effective) {
1664                                node_types.push(inline_effective);
1665                            }
1666                        }
1667                        InlineElementType::Edge {
1668                            name,
1669                            properties,
1670                            source_node_types,
1671                            target_node_types,
1672                            is_reference,
1673                            ..
1674                        } => {
1675                            let inline_effective = self.effective_type_key(name);
1676                            if *is_reference {
1677                                if self.catalog.get_edge_type_def(&inline_effective).is_none() {
1678                                    return Err(Error::Query(QueryError::new(
1679                                        QueryErrorKind::Semantic,
1680                                        format!(
1681                                            "Referenced edge type '{inline_effective}' does not exist"
1682                                        ),
1683                                    )));
1684                                }
1685                            } else {
1686                                let def = EdgeTypeDefinition {
1687                                    name: inline_effective.clone(),
1688                                    properties: properties
1689                                        .iter()
1690                                        .map(|p| TypedProperty {
1691                                            name: p.name.clone(),
1692                                            data_type: PropertyDataType::from_type_name(
1693                                                &p.data_type,
1694                                            ),
1695                                            nullable: p.nullable,
1696                                            default_value: None,
1697                                        })
1698                                        .collect(),
1699                                    constraints: Vec::new(),
1700                                    source_node_types: source_node_types.clone(),
1701                                    target_node_types: target_node_types.clone(),
1702                                };
1703                                self.catalog.register_or_replace_edge_type_def(def);
1704                                #[cfg(feature = "wal")]
1705                                {
1706                                    let props_for_wal: Vec<(String, String, bool)> = properties
1707                                        .iter()
1708                                        .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1709                                        .collect();
1710                                    self.log_schema_wal(&WalRecord::CreateEdgeType {
1711                                        name: inline_effective.clone(),
1712                                        properties: props_for_wal,
1713                                        constraints: Vec::new(),
1714                                    });
1715                                }
1716                            }
1717                            if !edge_types.contains(&inline_effective) {
1718                                edge_types.push(inline_effective);
1719                            }
1720                        }
1721                    }
1722                }
1723
1724                let def = GraphTypeDefinition {
1725                    name: effective_name.clone(),
1726                    allowed_node_types: node_types.clone(),
1727                    allowed_edge_types: edge_types.clone(),
1728                    open,
1729                };
1730                let result = if stmt.or_replace {
1731                    // Drop existing first, ignore error if not found
1732                    let _ = self.catalog.drop_graph_type(&effective_name);
1733                    self.catalog.register_graph_type(def)
1734                } else {
1735                    self.catalog.register_graph_type(def)
1736                };
1737                match result {
1738                    Ok(()) => {
1739                        wal_log!(
1740                            self,
1741                            WalRecord::CreateGraphType {
1742                                name: effective_name.clone(),
1743                                node_types,
1744                                edge_types,
1745                                open,
1746                            }
1747                        );
1748                        Ok(QueryResult::status(format!(
1749                            "Created graph type '{}'",
1750                            stmt.name
1751                        )))
1752                    }
1753                    Err(e) if stmt.if_not_exists => {
1754                        let _ = e;
1755                        Ok(QueryResult::status("No change"))
1756                    }
1757                    Err(e) => Err(Error::Query(QueryError::new(
1758                        QueryErrorKind::Semantic,
1759                        e.to_string(),
1760                    ))),
1761                }
1762            }
1763            SchemaStatement::DropGraphType { name, if_exists } => {
1764                let effective_name = self.effective_type_key(&name);
1765                match self.catalog.drop_graph_type(&effective_name) {
1766                    Ok(()) => {
1767                        wal_log!(
1768                            self,
1769                            WalRecord::DropGraphType {
1770                                name: effective_name
1771                            }
1772                        );
1773                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1774                    }
1775                    Err(e) if if_exists => {
1776                        let _ = e;
1777                        Ok(QueryResult::status("No change"))
1778                    }
1779                    Err(e) => Err(Error::Query(QueryError::new(
1780                        QueryErrorKind::Semantic,
1781                        e.to_string(),
1782                    ))),
1783                }
1784            }
1785            SchemaStatement::CreateSchema {
1786                name,
1787                if_not_exists,
1788            } => {
1789                if name.contains('/') {
1790                    return Err(Error::Query(QueryError::new(
1791                        QueryErrorKind::Semantic,
1792                        format!(
1793                            "Schema name '{name}' must not contain '/' (reserved as schema/graph separator)"
1794                        ),
1795                    )));
1796                }
1797                match self.catalog.register_schema_namespace(name.clone()) {
1798                    Ok(()) => {
1799                        wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1800                        // Auto-create the schema's default graph partition so that
1801                        // SESSION SET SCHEMA + queries work without an explicit graph.
1802                        let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1803                        if self.store.create_graph(&default_key).unwrap_or(false) {
1804                            wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1805                        }
1806                        Ok(QueryResult::status(format!("Created schema '{name}'")))
1807                    }
1808                    Err(e) if if_not_exists => {
1809                        let _ = e;
1810                        Ok(QueryResult::status("No change"))
1811                    }
1812                    Err(e) => Err(Error::Query(QueryError::new(
1813                        QueryErrorKind::Semantic,
1814                        e.to_string(),
1815                    ))),
1816                }
1817            }
1818            SchemaStatement::DropSchema { name, if_exists } => {
1819                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1820                // The auto-created __default__ graph is exempt from the check.
1821                let prefix = format!("{name}/");
1822                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1823                let has_graphs = self
1824                    .store
1825                    .graph_names()
1826                    .iter()
1827                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1828                let has_types = self
1829                    .catalog
1830                    .all_node_type_names()
1831                    .iter()
1832                    .any(|n| n.starts_with(&prefix))
1833                    || self
1834                        .catalog
1835                        .all_edge_type_names()
1836                        .iter()
1837                        .any(|n| n.starts_with(&prefix))
1838                    || self
1839                        .catalog
1840                        .all_graph_type_names()
1841                        .iter()
1842                        .any(|n| n.starts_with(&prefix));
1843                if has_graphs || has_types {
1844                    return Err(Error::Query(QueryError::new(
1845                        QueryErrorKind::Semantic,
1846                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1847                    )));
1848                }
1849                match self.catalog.drop_schema_namespace(&name) {
1850                    Ok(()) => {
1851                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1852                        // Drop the auto-created default graph partition
1853                        if self.store.drop_graph(&default_graph_key) {
1854                            wal_log!(
1855                                self,
1856                                WalRecord::DropNamedGraph {
1857                                    name: default_graph_key,
1858                                }
1859                            );
1860                        }
1861                        // If this session was using the dropped schema, reset it
1862                        let mut current = self.current_schema.lock();
1863                        if current
1864                            .as_deref()
1865                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1866                        {
1867                            *current = None;
1868                        }
1869                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1870                    }
1871                    Err(e) if if_exists => {
1872                        let _ = e;
1873                        Ok(QueryResult::status("No change"))
1874                    }
1875                    Err(e) => Err(Error::Query(QueryError::new(
1876                        QueryErrorKind::Semantic,
1877                        e.to_string(),
1878                    ))),
1879                }
1880            }
1881            SchemaStatement::AlterNodeType(stmt) => {
1882                use grafeo_adapters::query::gql::ast::TypeAlteration;
1883                let effective_name = self.effective_type_key(&stmt.name);
1884                let mut wal_alts = Vec::new();
1885                for alt in &stmt.alterations {
1886                    match alt {
1887                        TypeAlteration::AddProperty(prop) => {
1888                            let typed = TypedProperty {
1889                                name: prop.name.clone(),
1890                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1891                                nullable: prop.nullable,
1892                                default_value: prop
1893                                    .default_value
1894                                    .as_ref()
1895                                    .map(|s| parse_default_literal(s)),
1896                            };
1897                            self.catalog
1898                                .alter_node_type_add_property(&effective_name, typed)
1899                                .map_err(|e| {
1900                                    Error::Query(QueryError::new(
1901                                        QueryErrorKind::Semantic,
1902                                        e.to_string(),
1903                                    ))
1904                                })?;
1905                            wal_alts.push((
1906                                "add".to_string(),
1907                                prop.name.clone(),
1908                                prop.data_type.clone(),
1909                                prop.nullable,
1910                            ));
1911                        }
1912                        TypeAlteration::DropProperty(name) => {
1913                            self.catalog
1914                                .alter_node_type_drop_property(&effective_name, name)
1915                                .map_err(|e| {
1916                                    Error::Query(QueryError::new(
1917                                        QueryErrorKind::Semantic,
1918                                        e.to_string(),
1919                                    ))
1920                                })?;
1921                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1922                        }
1923                    }
1924                }
1925                wal_log!(
1926                    self,
1927                    WalRecord::AlterNodeType {
1928                        name: effective_name,
1929                        alterations: wal_alts,
1930                    }
1931                );
1932                Ok(QueryResult::status(format!(
1933                    "Altered node type '{}'",
1934                    stmt.name
1935                )))
1936            }
1937            SchemaStatement::AlterEdgeType(stmt) => {
1938                use grafeo_adapters::query::gql::ast::TypeAlteration;
1939                let effective_name = self.effective_type_key(&stmt.name);
1940                let mut wal_alts = Vec::new();
1941                for alt in &stmt.alterations {
1942                    match alt {
1943                        TypeAlteration::AddProperty(prop) => {
1944                            let typed = TypedProperty {
1945                                name: prop.name.clone(),
1946                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1947                                nullable: prop.nullable,
1948                                default_value: prop
1949                                    .default_value
1950                                    .as_ref()
1951                                    .map(|s| parse_default_literal(s)),
1952                            };
1953                            self.catalog
1954                                .alter_edge_type_add_property(&effective_name, typed)
1955                                .map_err(|e| {
1956                                    Error::Query(QueryError::new(
1957                                        QueryErrorKind::Semantic,
1958                                        e.to_string(),
1959                                    ))
1960                                })?;
1961                            wal_alts.push((
1962                                "add".to_string(),
1963                                prop.name.clone(),
1964                                prop.data_type.clone(),
1965                                prop.nullable,
1966                            ));
1967                        }
1968                        TypeAlteration::DropProperty(name) => {
1969                            self.catalog
1970                                .alter_edge_type_drop_property(&effective_name, name)
1971                                .map_err(|e| {
1972                                    Error::Query(QueryError::new(
1973                                        QueryErrorKind::Semantic,
1974                                        e.to_string(),
1975                                    ))
1976                                })?;
1977                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1978                        }
1979                    }
1980                }
1981                wal_log!(
1982                    self,
1983                    WalRecord::AlterEdgeType {
1984                        name: effective_name,
1985                        alterations: wal_alts,
1986                    }
1987                );
1988                Ok(QueryResult::status(format!(
1989                    "Altered edge type '{}'",
1990                    stmt.name
1991                )))
1992            }
1993            SchemaStatement::AlterGraphType(stmt) => {
1994                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1995                let effective_name = self.effective_type_key(&stmt.name);
1996                let mut wal_alts = Vec::new();
1997                for alt in &stmt.alterations {
1998                    match alt {
1999                        GraphTypeAlteration::AddNodeType(name) => {
2000                            self.catalog
2001                                .alter_graph_type_add_node_type(&effective_name, name.clone())
2002                                .map_err(|e| {
2003                                    Error::Query(QueryError::new(
2004                                        QueryErrorKind::Semantic,
2005                                        e.to_string(),
2006                                    ))
2007                                })?;
2008                            wal_alts.push(("add_node_type".to_string(), name.clone()));
2009                        }
2010                        GraphTypeAlteration::DropNodeType(name) => {
2011                            self.catalog
2012                                .alter_graph_type_drop_node_type(&effective_name, name)
2013                                .map_err(|e| {
2014                                    Error::Query(QueryError::new(
2015                                        QueryErrorKind::Semantic,
2016                                        e.to_string(),
2017                                    ))
2018                                })?;
2019                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
2020                        }
2021                        GraphTypeAlteration::AddEdgeType(name) => {
2022                            self.catalog
2023                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
2024                                .map_err(|e| {
2025                                    Error::Query(QueryError::new(
2026                                        QueryErrorKind::Semantic,
2027                                        e.to_string(),
2028                                    ))
2029                                })?;
2030                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
2031                        }
2032                        GraphTypeAlteration::DropEdgeType(name) => {
2033                            self.catalog
2034                                .alter_graph_type_drop_edge_type(&effective_name, name)
2035                                .map_err(|e| {
2036                                    Error::Query(QueryError::new(
2037                                        QueryErrorKind::Semantic,
2038                                        e.to_string(),
2039                                    ))
2040                                })?;
2041                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
2042                        }
2043                    }
2044                }
2045                wal_log!(
2046                    self,
2047                    WalRecord::AlterGraphType {
2048                        name: effective_name,
2049                        alterations: wal_alts,
2050                    }
2051                );
2052                Ok(QueryResult::status(format!(
2053                    "Altered graph type '{}'",
2054                    stmt.name
2055                )))
2056            }
2057            SchemaStatement::CreateProcedure(stmt) => {
2058                use crate::catalog::ProcedureDefinition;
2059
2060                let def = ProcedureDefinition {
2061                    name: stmt.name.clone(),
2062                    params: stmt
2063                        .params
2064                        .iter()
2065                        .map(|p| (p.name.clone(), p.param_type.clone()))
2066                        .collect(),
2067                    returns: stmt
2068                        .returns
2069                        .iter()
2070                        .map(|r| (r.name.clone(), r.return_type.clone()))
2071                        .collect(),
2072                    body: stmt.body.clone(),
2073                };
2074
2075                if stmt.or_replace {
2076                    self.catalog.replace_procedure(def).map_err(|e| {
2077                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
2078                    })?;
2079                } else {
2080                    match self.catalog.register_procedure(def) {
2081                        Ok(()) => {}
2082                        Err(_) if stmt.if_not_exists => {
2083                            return Ok(QueryResult::empty());
2084                        }
2085                        Err(e) => {
2086                            return Err(Error::Query(QueryError::new(
2087                                QueryErrorKind::Semantic,
2088                                e.to_string(),
2089                            )));
2090                        }
2091                    }
2092                }
2093
2094                wal_log!(
2095                    self,
2096                    WalRecord::CreateProcedure {
2097                        name: stmt.name.clone(),
2098                        params: stmt
2099                            .params
2100                            .iter()
2101                            .map(|p| (p.name.clone(), p.param_type.clone()))
2102                            .collect(),
2103                        returns: stmt
2104                            .returns
2105                            .iter()
2106                            .map(|r| (r.name.clone(), r.return_type.clone()))
2107                            .collect(),
2108                        body: stmt.body,
2109                    }
2110                );
2111                Ok(QueryResult::status(format!(
2112                    "Created procedure '{}'",
2113                    stmt.name
2114                )))
2115            }
2116            SchemaStatement::DropProcedure { name, if_exists } => {
2117                match self.catalog.drop_procedure(&name) {
2118                    Ok(()) => {}
2119                    Err(_) if if_exists => {
2120                        return Ok(QueryResult::empty());
2121                    }
2122                    Err(e) => {
2123                        return Err(Error::Query(QueryError::new(
2124                            QueryErrorKind::Semantic,
2125                            e.to_string(),
2126                        )));
2127                    }
2128                }
2129                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2130                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2131            }
2132            SchemaStatement::ShowIndexes => {
2133                return self.execute_show_indexes();
2134            }
2135            SchemaStatement::ShowConstraints => {
2136                return self.execute_show_constraints();
2137            }
2138            SchemaStatement::ShowNodeTypes => {
2139                return self.execute_show_node_types();
2140            }
2141            SchemaStatement::ShowEdgeTypes => {
2142                return self.execute_show_edge_types();
2143            }
2144            SchemaStatement::ShowGraphTypes => {
2145                return self.execute_show_graph_types();
2146            }
2147            SchemaStatement::ShowGraphType(name) => {
2148                return self.execute_show_graph_type(&name);
2149            }
2150            SchemaStatement::ShowCurrentGraphType => {
2151                return self.execute_show_current_graph_type();
2152            }
2153            SchemaStatement::ShowGraphs => {
2154                return self.execute_show_graphs();
2155            }
2156            SchemaStatement::ShowSchemas => {
2157                return self.execute_show_schemas();
2158            }
2159        };
2160
2161        // Invalidate all cached query plans after any successful DDL change.
2162        // DDL is rare, so clearing the entire cache is cheap and correct.
2163        if result.is_ok() {
2164            self.query_cache.clear();
2165        }
2166
2167        result
2168    }
2169
2170    /// Creates a vector index on the store by scanning existing nodes.
2171    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2172    fn create_vector_index_on_store(
2173        store: &LpgStore,
2174        label: &str,
2175        property: &str,
2176        dimensions: Option<usize>,
2177        metric: Option<&str>,
2178    ) -> Result<()> {
2179        use grafeo_common::types::{PropertyKey, Value};
2180        use grafeo_common::utils::error::Error;
2181        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2182
2183        let metric = match metric {
2184            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2185                Error::Internal(format!(
2186                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2187                ))
2188            })?,
2189            None => DistanceMetric::Cosine,
2190        };
2191
2192        let prop_key = PropertyKey::new(property);
2193        let mut found_dims: Option<usize> = dimensions;
2194        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2195
2196        for node in store.nodes_with_label(label) {
2197            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2198                if let Some(expected) = found_dims {
2199                    if v.len() != expected {
2200                        return Err(Error::Internal(format!(
2201                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2202                            v.len(),
2203                            node.id.0
2204                        )));
2205                    }
2206                } else {
2207                    found_dims = Some(v.len());
2208                }
2209                vectors.push((node.id, v.to_vec()));
2210            }
2211        }
2212
2213        let Some(dims) = found_dims else {
2214            return Err(Error::Internal(format!(
2215                "No vector properties found on :{label}({property}) and no dimensions specified"
2216            )));
2217        };
2218
2219        let config = HnswConfig::new(dims, metric);
2220        let index = HnswIndex::with_capacity(config, vectors.len());
2221        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2222        for (node_id, vec) in &vectors {
2223            index.insert(*node_id, vec, &accessor);
2224        }
2225
2226        store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2227        Ok(())
2228    }
2229
2230    /// Stub for when vector-index feature is not enabled.
2231    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2232    fn create_vector_index_on_store(
2233        _store: &LpgStore,
2234        _label: &str,
2235        _property: &str,
2236        _dimensions: Option<usize>,
2237        _metric: Option<&str>,
2238    ) -> Result<()> {
2239        Err(grafeo_common::utils::error::Error::Internal(
2240            "Vector index support requires the 'vector-index' feature".to_string(),
2241        ))
2242    }
2243
2244    /// Creates a text index on the store by scanning existing nodes.
2245    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2246    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2247        use grafeo_common::types::{PropertyKey, Value};
2248        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2249
2250        let mut index = InvertedIndex::new(BM25Config::default());
2251        let prop_key = PropertyKey::new(property);
2252
2253        let nodes = store.nodes_by_label(label);
2254        for node_id in nodes {
2255            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2256                index.insert(node_id, text.as_str());
2257            }
2258        }
2259
2260        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2261        Ok(())
2262    }
2263
2264    /// Stub for when text-index feature is not enabled.
2265    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2266    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2267        Err(grafeo_common::utils::error::Error::Internal(
2268            "Text index support requires the 'text-index' feature".to_string(),
2269        ))
2270    }
2271
2272    /// Returns a table of all indexes from the catalog.
2273    fn execute_show_indexes(&self) -> Result<QueryResult> {
2274        let indexes = self.catalog.all_indexes();
2275        let columns = vec![
2276            "name".to_string(),
2277            "type".to_string(),
2278            "label".to_string(),
2279            "property".to_string(),
2280        ];
2281        let rows: Vec<Vec<Value>> = indexes
2282            .into_iter()
2283            .map(|def| {
2284                let label_name = self
2285                    .catalog
2286                    .get_label_name(def.label)
2287                    .unwrap_or_else(|| "?".into());
2288                let prop_name = self
2289                    .catalog
2290                    .get_property_key_name(def.property_key)
2291                    .unwrap_or_else(|| "?".into());
2292                vec![
2293                    Value::from(def.name),
2294                    Value::from(format!("{:?}", def.index_type)),
2295                    Value::from(&*label_name),
2296                    Value::from(&*prop_name),
2297                ]
2298            })
2299            .collect();
2300        Ok(QueryResult {
2301            columns,
2302            column_types: Vec::new(),
2303            rows,
2304            ..QueryResult::empty()
2305        })
2306    }
2307
2308    /// Returns a table of all constraints (currently metadata-only).
2309    fn execute_show_constraints(&self) -> Result<QueryResult> {
2310        // Constraints are tracked in WAL but not yet in a queryable catalog.
2311        // Return an empty table with the expected schema.
2312        Ok(QueryResult {
2313            columns: vec![
2314                "name".to_string(),
2315                "type".to_string(),
2316                "label".to_string(),
2317                "properties".to_string(),
2318            ],
2319            column_types: Vec::new(),
2320            rows: Vec::new(),
2321            ..QueryResult::empty()
2322        })
2323    }
2324
2325    /// Returns a table of all registered node types in the current schema.
2326    fn execute_show_node_types(&self) -> Result<QueryResult> {
2327        let columns = vec![
2328            "name".to_string(),
2329            "properties".to_string(),
2330            "constraints".to_string(),
2331            "parents".to_string(),
2332        ];
2333        let schema = self.current_schema.lock().clone();
2334        let all_names = self.catalog.all_node_type_names();
2335        let type_names: Vec<String> = match &schema {
2336            Some(s) => {
2337                let prefix = format!("{s}/");
2338                all_names
2339                    .into_iter()
2340                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2341                    .collect()
2342            }
2343            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2344        };
2345        let rows: Vec<Vec<Value>> = type_names
2346            .into_iter()
2347            .filter_map(|name| {
2348                let lookup = match &schema {
2349                    Some(s) => format!("{s}/{name}"),
2350                    None => name.clone(),
2351                };
2352                let def = self.catalog.get_node_type(&lookup)?;
2353                let props: Vec<String> = def
2354                    .properties
2355                    .iter()
2356                    .map(|p| {
2357                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2358                        format!("{} {}{}", p.name, p.data_type, nullable)
2359                    })
2360                    .collect();
2361                let constraints: Vec<String> =
2362                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2363                let parents = def.parent_types.join(", ");
2364                Some(vec![
2365                    Value::from(name),
2366                    Value::from(props.join(", ")),
2367                    Value::from(constraints.join(", ")),
2368                    Value::from(parents),
2369                ])
2370            })
2371            .collect();
2372        Ok(QueryResult {
2373            columns,
2374            column_types: Vec::new(),
2375            rows,
2376            ..QueryResult::empty()
2377        })
2378    }
2379
2380    /// Returns a table of all registered edge types in the current schema.
2381    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2382        let columns = vec![
2383            "name".to_string(),
2384            "properties".to_string(),
2385            "source_types".to_string(),
2386            "target_types".to_string(),
2387        ];
2388        let schema = self.current_schema.lock().clone();
2389        let all_names = self.catalog.all_edge_type_names();
2390        let type_names: Vec<String> = match &schema {
2391            Some(s) => {
2392                let prefix = format!("{s}/");
2393                all_names
2394                    .into_iter()
2395                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2396                    .collect()
2397            }
2398            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2399        };
2400        let rows: Vec<Vec<Value>> = type_names
2401            .into_iter()
2402            .filter_map(|name| {
2403                let lookup = match &schema {
2404                    Some(s) => format!("{s}/{name}"),
2405                    None => name.clone(),
2406                };
2407                let def = self.catalog.get_edge_type_def(&lookup)?;
2408                let props: Vec<String> = def
2409                    .properties
2410                    .iter()
2411                    .map(|p| {
2412                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2413                        format!("{} {}{}", p.name, p.data_type, nullable)
2414                    })
2415                    .collect();
2416                let src = def.source_node_types.join(", ");
2417                let tgt = def.target_node_types.join(", ");
2418                Some(vec![
2419                    Value::from(name),
2420                    Value::from(props.join(", ")),
2421                    Value::from(src),
2422                    Value::from(tgt),
2423                ])
2424            })
2425            .collect();
2426        Ok(QueryResult {
2427            columns,
2428            column_types: Vec::new(),
2429            rows,
2430            ..QueryResult::empty()
2431        })
2432    }
2433
2434    /// Returns a table of all registered graph types in the current schema.
2435    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2436        let columns = vec![
2437            "name".to_string(),
2438            "open".to_string(),
2439            "node_types".to_string(),
2440            "edge_types".to_string(),
2441        ];
2442        let schema = self.current_schema.lock().clone();
2443        let all_names = self.catalog.all_graph_type_names();
2444        let type_names: Vec<String> = match &schema {
2445            Some(s) => {
2446                let prefix = format!("{s}/");
2447                all_names
2448                    .into_iter()
2449                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2450                    .collect()
2451            }
2452            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2453        };
2454        let rows: Vec<Vec<Value>> = type_names
2455            .into_iter()
2456            .filter_map(|name| {
2457                let lookup = match &schema {
2458                    Some(s) => format!("{s}/{name}"),
2459                    None => name.clone(),
2460                };
2461                let def = self.catalog.get_graph_type_def(&lookup)?;
2462                // Strip schema prefix from allowed type names for display
2463                let strip = |n: &String| -> String {
2464                    match &schema {
2465                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2466                        None => n.clone(),
2467                    }
2468                };
2469                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2470                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2471                Some(vec![
2472                    Value::from(name),
2473                    Value::from(def.open),
2474                    Value::from(node_types.join(", ")),
2475                    Value::from(edge_types.join(", ")),
2476                ])
2477            })
2478            .collect();
2479        Ok(QueryResult {
2480            columns,
2481            column_types: Vec::new(),
2482            rows,
2483            ..QueryResult::empty()
2484        })
2485    }
2486
2487    /// Returns the list of named graphs visible in the current schema context.
2488    ///
2489    /// When a session schema is set, only graphs belonging to that schema are
2490    /// shown (their compound prefix is stripped). When no schema is set, graphs
2491    /// without a schema prefix are shown (the default schema).
2492    #[cfg(feature = "lpg")]
2493    fn execute_show_graphs(&self) -> Result<QueryResult> {
2494        let schema = self.current_schema.lock().clone();
2495        let all_names = self.store.graph_names();
2496
2497        let mut names: Vec<String> = match &schema {
2498            Some(s) => {
2499                let prefix = format!("{s}/");
2500                all_names
2501                    .into_iter()
2502                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2503                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2504                    .collect()
2505            }
2506            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2507        };
2508        names.sort();
2509
2510        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2511        Ok(QueryResult {
2512            columns: vec!["name".to_string()],
2513            column_types: Vec::new(),
2514            rows,
2515            ..QueryResult::empty()
2516        })
2517    }
2518
2519    /// Returns the list of all schema namespaces.
2520    fn execute_show_schemas(&self) -> Result<QueryResult> {
2521        let mut names = self.catalog.schema_names();
2522        names.sort();
2523        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2524        Ok(QueryResult {
2525            columns: vec!["name".to_string()],
2526            column_types: Vec::new(),
2527            rows,
2528            ..QueryResult::empty()
2529        })
2530    }
2531
2532    /// Returns detailed info for a specific graph type.
2533    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2534        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2535
2536        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2537            Error::Query(QueryError::new(
2538                QueryErrorKind::Semantic,
2539                format!("Graph type '{name}' not found"),
2540            ))
2541        })?;
2542
2543        let columns = vec![
2544            "name".to_string(),
2545            "open".to_string(),
2546            "node_types".to_string(),
2547            "edge_types".to_string(),
2548        ];
2549        let rows = vec![vec![
2550            Value::from(def.name),
2551            Value::from(def.open),
2552            Value::from(def.allowed_node_types.join(", ")),
2553            Value::from(def.allowed_edge_types.join(", ")),
2554        ]];
2555        Ok(QueryResult {
2556            columns,
2557            column_types: Vec::new(),
2558            rows,
2559            ..QueryResult::empty()
2560        })
2561    }
2562
2563    /// Returns the graph type bound to the current graph.
2564    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2565        let graph_name = self
2566            .current_graph()
2567            .unwrap_or_else(|| "default".to_string());
2568        let columns = vec![
2569            "graph".to_string(),
2570            "graph_type".to_string(),
2571            "open".to_string(),
2572            "node_types".to_string(),
2573            "edge_types".to_string(),
2574        ];
2575
2576        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2577            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2578        {
2579            let rows = vec![vec![
2580                Value::from(graph_name),
2581                Value::from(type_name),
2582                Value::from(def.open),
2583                Value::from(def.allowed_node_types.join(", ")),
2584                Value::from(def.allowed_edge_types.join(", ")),
2585            ]];
2586            return Ok(QueryResult {
2587                columns,
2588                column_types: Vec::new(),
2589                rows,
2590                ..QueryResult::empty()
2591            });
2592        }
2593
2594        // No graph type binding found
2595        Ok(QueryResult {
2596            columns,
2597            column_types: Vec::new(),
2598            rows: vec![vec![
2599                Value::from(graph_name),
2600                Value::Null,
2601                Value::Null,
2602                Value::Null,
2603                Value::Null,
2604            ]],
2605            ..QueryResult::empty()
2606        })
2607    }
2608
2609    /// Executes a GQL query.
2610    ///
2611    /// # Errors
2612    ///
2613    /// Returns an error if the query fails to parse or execute.
2614    ///
2615    /// # Examples
2616    ///
2617    /// ```no_run
2618    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2619    /// use grafeo_engine::GrafeoDB;
2620    ///
2621    /// let db = GrafeoDB::new_in_memory();
2622    /// let session = db.session();
2623    ///
2624    /// // Create a node
2625    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2626    ///
2627    /// // Query nodes
2628    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2629    /// for row in result.rows() {
2630    ///     println!("{:?}", row);
2631    /// }
2632    /// # Ok(())
2633    /// # }
2634    /// ```
2635    #[cfg(feature = "gql")]
2636    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2637        self.require_lpg("GQL")?;
2638
2639        #[cfg(feature = "testing-statement-injection")]
2640        grafeo_common::testing::statement_failure::maybe_fail_statement().map_err(|e| {
2641            grafeo_common::utils::error::Error::Internal(format!("injected failure: {e}"))
2642        })?;
2643
2644        use crate::query::{
2645            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2646            translators::gql,
2647        };
2648
2649        let _span = grafeo_info_span!(
2650            "grafeo::session::execute",
2651            language = "gql",
2652            query_len = query.len(),
2653        );
2654
2655        #[cfg(not(target_arch = "wasm32"))]
2656        let start_time = std::time::Instant::now();
2657
2658        // Parse and translate, checking for session/schema commands first
2659        let translation = gql::translate_full(query)?;
2660        let logical_plan = match translation {
2661            gql::GqlTranslationResult::SessionCommand(cmd) => {
2662                return self.execute_session_command(cmd);
2663            }
2664            #[cfg(feature = "lpg")]
2665            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2666                // All DDL requires Admin role
2667                self.require_permission(crate::auth::StatementKind::Admin)?;
2668                if *self.read_only_tx.lock() {
2669                    return Err(grafeo_common::utils::error::Error::Transaction(
2670                        grafeo_common::utils::error::TransactionError::ReadOnly,
2671                    ));
2672                }
2673                return self.execute_schema_command(cmd);
2674            }
2675            gql::GqlTranslationResult::Plan(plan) => {
2676                // Only walk the operator tree when it matters: non-admin
2677                // identities need permission checks, read-only transactions
2678                // need mutation blocking. Admin sessions in auto-commit mode
2679                // skip the tree walk entirely.
2680                let read_only = *self.read_only_tx.lock();
2681                let need_check = read_only || !self.identity.can_admin();
2682                let is_mutation = need_check && plan.root.has_mutations();
2683                if is_mutation {
2684                    self.require_permission(crate::auth::StatementKind::Write)?;
2685                }
2686                if read_only && is_mutation {
2687                    return Err(grafeo_common::utils::error::Error::Transaction(
2688                        grafeo_common::utils::error::TransactionError::ReadOnly,
2689                    ));
2690                }
2691                plan
2692            }
2693            #[cfg(not(feature = "lpg"))]
2694            gql::GqlTranslationResult::SchemaCommand(_) => {
2695                return Err(grafeo_common::utils::error::Error::Internal(
2696                    "Schema commands require the `lpg` feature".to_string(),
2697                ));
2698            }
2699        };
2700
2701        // Create cache key for this query
2702        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2703
2704        // Try to get cached optimized plan, or use the plan we just translated
2705        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2706            cached_plan
2707        } else {
2708            // Semantic validation
2709            let mut binder = Binder::new();
2710            let _binding_context = binder.bind(&logical_plan)?;
2711
2712            // Optimize the plan
2713            let active = self.active_store();
2714            let optimizer = Optimizer::from_graph_store(&*active);
2715            let plan = optimizer.optimize(logical_plan)?;
2716
2717            // Cache the optimized plan for future use
2718            self.query_cache.put_optimized(cache_key, plan.clone());
2719
2720            plan
2721        };
2722
2723        // Resolve the active store for query execution
2724        let active = self.active_store();
2725
2726        // EXPLAIN: annotate pushdown hints and return the plan tree
2727        if optimized_plan.explain {
2728            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2729            let mut plan = optimized_plan;
2730            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2731            return Ok(explain_result(&plan));
2732        }
2733
2734        // PROFILE: execute with per-operator instrumentation
2735        if optimized_plan.profile {
2736            let has_mutations = optimized_plan.root.has_mutations();
2737            return self.with_auto_commit(has_mutations, || {
2738                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2739                let planner = self.create_planner_for_store(
2740                    Arc::clone(&active),
2741                    viewing_epoch,
2742                    transaction_id,
2743                );
2744                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2745
2746                let executor = self.make_executor(physical_plan.columns.clone());
2747                let _result = executor.execute(physical_plan.operator.as_mut())?;
2748
2749                let total_time_ms;
2750                #[cfg(not(target_arch = "wasm32"))]
2751                {
2752                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2753                }
2754                #[cfg(target_arch = "wasm32")]
2755                {
2756                    total_time_ms = 0.0;
2757                }
2758
2759                let profile_tree = crate::query::profile::build_profile_tree(
2760                    &optimized_plan.root,
2761                    &mut entries.into_iter(),
2762                );
2763                Ok(crate::query::profile::profile_result(
2764                    &profile_tree,
2765                    total_time_ms,
2766                ))
2767            });
2768        }
2769
2770        let has_mutations = optimized_plan.root.has_mutations();
2771
2772        let result = self.with_auto_commit(has_mutations, || {
2773            // Get transaction context for MVCC visibility
2774            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2775
2776            // Convert to physical plan with transaction context
2777            // (Physical planning cannot be cached as it depends on transaction state)
2778            // Safe to use read-only fast path when: this query has no mutations AND
2779            // there is no active transaction that may have prior uncommitted writes.
2780            let has_active_tx = self.current_transaction.lock().is_some();
2781            let read_only = !has_mutations && !has_active_tx;
2782            let planner = self.create_planner_for_store_with_read_only(
2783                Arc::clone(&active),
2784                viewing_epoch,
2785                transaction_id,
2786                read_only,
2787            );
2788            let physical_plan = planner.plan(&optimized_plan)?;
2789
2790            // Execute the plan via push-based pipeline when possible
2791            let executor = self.make_executor(physical_plan.columns.clone());
2792            let (mut source, push_ops) = {
2793                #[cfg(feature = "spill")]
2794                {
2795                    let memory_ctx = self.make_operator_memory_context();
2796                    grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2797                        physical_plan.into_operator(),
2798                        memory_ctx,
2799                    )
2800                }
2801                #[cfg(not(feature = "spill"))]
2802                {
2803                    grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2804                        physical_plan.into_operator(),
2805                    )
2806                }
2807            };
2808            let mut result = if push_ops.is_empty() {
2809                // Pure source query: use traditional pull-based execution
2810                executor.execute(source.as_mut())?
2811            } else {
2812                // Pipeline execution: push data from source through operators
2813                executor.execute_pipeline(source, push_ops)?
2814            };
2815
2816            // Add execution metrics
2817            let rows_scanned = result.rows.len() as u64;
2818            #[cfg(not(target_arch = "wasm32"))]
2819            {
2820                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2821                result.execution_time_ms = Some(elapsed_ms);
2822            }
2823            result.rows_scanned = Some(rows_scanned);
2824
2825            Ok(result)
2826        });
2827
2828        // Record metrics for this query execution.
2829        #[cfg(feature = "metrics")]
2830        {
2831            #[cfg(not(target_arch = "wasm32"))]
2832            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2833            #[cfg(target_arch = "wasm32")]
2834            let elapsed_ms = None;
2835            self.record_query_metrics("gql", elapsed_ms, &result);
2836        }
2837
2838        result
2839    }
2840
2841    /// Executes a GQL query and returns a lazy result stream.
2842    ///
2843    /// The stream pulls chunks from the operator pipeline on demand. Use it
2844    /// when the result set is too large to fit in memory or when you want
2845    /// first-row latency (process rows as they arrive rather than waiting
2846    /// for the whole query to complete).
2847    ///
2848    /// This is the read-only, pull-only path: `execute_streaming` rejects
2849    /// mutations (INSERT/DELETE/SET), schema/session commands, EXPLAIN,
2850    /// PROFILE, and queries whose planner emits a push-based pipeline. Run
2851    /// those via [`execute`](Self::execute) instead.
2852    ///
2853    /// # Stability: Experimental
2854    ///
2855    /// New in 0.5.40. Signature may change before being promoted to Beta.
2856    ///
2857    /// # Errors
2858    ///
2859    /// Returns an error if parsing or planning fails, if the query is a
2860    /// kind that cannot be streamed, or if permission checks fail.
2861    #[cfg(all(feature = "gql", feature = "lpg"))]
2862    pub fn execute_streaming(
2863        &self,
2864        query: &str,
2865    ) -> Result<crate::query::executor::stream::ResultStream<'_>> {
2866        use crate::query::executor::stream::{ResultStream, StreamGuard};
2867
2868        let (source, columns, deadline) = self.build_streaming_plan(query)?;
2869        let guard = StreamGuard::new(&self.active_streams);
2870        Ok(ResultStream::new(source, columns, deadline, guard))
2871    }
2872
2873    /// Builds a pull-based physical pipeline for streaming, returning the
2874    /// root operator and metadata needed to wrap it in a `ResultStream` or
2875    /// `OwnedResultStream`.
2876    ///
2877    /// Rejects session/schema commands, mutations, EXPLAIN/PROFILE, and plans
2878    /// that require a push-based pipeline. Used by both
2879    /// [`Session::execute_streaming`] and [`GrafeoDB::execute_streaming`].
2880    #[cfg(all(feature = "gql", feature = "lpg"))]
2881    pub(crate) fn build_streaming_plan(
2882        &self,
2883        query: &str,
2884    ) -> Result<(
2885        Box<dyn grafeo_core::execution::operators::Operator>,
2886        Vec<String>,
2887        Option<Instant>,
2888    )> {
2889        use crate::query::{
2890            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2891            translators::gql,
2892        };
2893
2894        self.require_lpg("GQL")?;
2895
2896        let _span = grafeo_info_span!(
2897            "grafeo::session::execute_streaming",
2898            language = "gql",
2899            query_len = query.len(),
2900        );
2901
2902        // Parse and translate, rejecting anything that isn't a streamable query.
2903        let translation = gql::translate_full(query)?;
2904        let logical_plan = match translation {
2905            gql::GqlTranslationResult::SessionCommand(_) => {
2906                return Err(grafeo_common::utils::error::Error::Query(
2907                    grafeo_common::utils::error::QueryError::new(
2908                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2909                        "session commands cannot be streamed; use execute() instead",
2910                    ),
2911                ));
2912            }
2913            gql::GqlTranslationResult::SchemaCommand(_) => {
2914                return Err(grafeo_common::utils::error::Error::Query(
2915                    grafeo_common::utils::error::QueryError::new(
2916                        grafeo_common::utils::error::QueryErrorKind::Semantic,
2917                        "schema DDL cannot be streamed; use execute() instead",
2918                    ),
2919                ));
2920            }
2921            gql::GqlTranslationResult::Plan(plan) => {
2922                if plan.root.has_mutations() {
2923                    return Err(grafeo_common::utils::error::Error::Query(
2924                        grafeo_common::utils::error::QueryError::new(
2925                            grafeo_common::utils::error::QueryErrorKind::Semantic,
2926                            "mutating queries cannot be streamed; use execute() instead",
2927                        ),
2928                    ));
2929                }
2930                if !self.identity.can_admin() {
2931                    self.require_permission(crate::auth::StatementKind::Read)?;
2932                }
2933                plan
2934            }
2935        };
2936
2937        // Cache + bind + optimize (same path as execute).
2938        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2939        let optimized_plan = if let Some(cached) = self.query_cache.get_optimized(&cache_key) {
2940            cached
2941        } else {
2942            let mut binder = Binder::new();
2943            let _binding_context = binder.bind(&logical_plan)?;
2944            let active = self.active_store();
2945            let optimizer = Optimizer::from_graph_store(&*active);
2946            let plan = optimizer.optimize(logical_plan)?;
2947            self.query_cache.put_optimized(cache_key, plan.clone());
2948            plan
2949        };
2950
2951        if optimized_plan.explain || optimized_plan.profile {
2952            return Err(grafeo_common::utils::error::Error::Query(
2953                grafeo_common::utils::error::QueryError::new(
2954                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2955                    "EXPLAIN and PROFILE cannot be streamed; use execute() instead",
2956                ),
2957            ));
2958        }
2959
2960        // Plan to physical operators.
2961        let active = self.active_store();
2962        let has_active_tx = self.current_transaction.lock().is_some();
2963        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2964        let planner = self.create_planner_for_store_with_read_only(
2965            Arc::clone(&active),
2966            viewing_epoch,
2967            transaction_id,
2968            !has_active_tx,
2969        );
2970        let physical_plan = planner.plan(&optimized_plan)?;
2971        let columns = physical_plan.columns.clone();
2972
2973        // Streaming only supports the pure pull path. Reject plans that would
2974        // require push operators (Sort, Aggregate, Distinct, etc.) so we never
2975        // silently materialize under the hood.
2976        let (source, push_ops) = {
2977            #[cfg(feature = "spill")]
2978            {
2979                let memory_ctx = self.make_operator_memory_context();
2980                grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2981                    physical_plan.into_operator(),
2982                    memory_ctx,
2983                )
2984            }
2985            #[cfg(not(feature = "spill"))]
2986            {
2987                grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2988                    physical_plan.into_operator(),
2989                )
2990            }
2991        };
2992        if !push_ops.is_empty() {
2993            return Err(grafeo_common::utils::error::Error::Query(
2994                grafeo_common::utils::error::QueryError::new(
2995                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2996                    "query requires a push-based pipeline (ORDER BY / aggregate / DISTINCT) \
2997                     which cannot be streamed; use execute() instead",
2998                ),
2999            ));
3000        }
3001
3002        Ok((source, columns, self.query_deadline()))
3003    }
3004
3005    /// Executes a GQL query with visibility at the specified epoch.
3006    ///
3007    /// This enables time-travel queries: the query sees the database
3008    /// as it existed at the given epoch.
3009    ///
3010    /// # Errors
3011    ///
3012    /// Returns an error if parsing or execution fails.
3013    #[cfg(feature = "gql")]
3014    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
3015        let previous = self.viewing_epoch_override.lock().replace(epoch);
3016        let result = self.execute(query);
3017        *self.viewing_epoch_override.lock() = previous;
3018        result
3019    }
3020
3021    /// Executes a GQL query at a specific epoch with optional parameters.
3022    ///
3023    /// Combines epoch-based time travel with parameterized queries.
3024    ///
3025    /// # Errors
3026    ///
3027    /// Returns an error if parsing or execution fails.
3028    #[cfg(feature = "gql")]
3029    pub fn execute_at_epoch_with_params(
3030        &self,
3031        query: &str,
3032        epoch: EpochId,
3033        params: Option<std::collections::HashMap<String, Value>>,
3034    ) -> Result<QueryResult> {
3035        let previous = self.viewing_epoch_override.lock().replace(epoch);
3036        let result = if let Some(p) = params {
3037            self.execute_with_params(query, p)
3038        } else {
3039            self.execute(query)
3040        };
3041        *self.viewing_epoch_override.lock() = previous;
3042        result
3043    }
3044
3045    /// Executes a GQL query with parameters.
3046    ///
3047    /// # Errors
3048    ///
3049    /// Returns an error if the query fails to parse or execute.
3050    #[cfg(feature = "gql")]
3051    pub fn execute_with_params(
3052        &self,
3053        query: &str,
3054        params: std::collections::HashMap<String, Value>,
3055    ) -> Result<QueryResult> {
3056        self.require_lpg("GQL")?;
3057
3058        use crate::query::processor::{QueryLanguage, QueryProcessor};
3059
3060        // Reject writes if the identity lacks permission. Parse the query
3061        // to determine mutation status reliably (the text heuristic has false
3062        // negatives that could bypass authorization).
3063        let has_mutations = if self.identity.can_write() {
3064            // Fast path: identity can write, use heuristic for auto-commit only
3065            Self::query_looks_like_mutation(query)
3066        } else {
3067            // Restricted identity: parse to check mutations reliably
3068            use crate::query::translators::gql;
3069            match gql::translate(query) {
3070                Ok(plan) if plan.root.has_mutations() => {
3071                    self.require_permission(crate::auth::StatementKind::Write)?;
3072                    true
3073                }
3074                Ok(_) => false,
3075                // Parse error: let the processor handle it below
3076                Err(_) => Self::query_looks_like_mutation(query),
3077            }
3078        };
3079        let active = self.active_store();
3080
3081        self.with_auto_commit(has_mutations, || {
3082            // Get transaction context for MVCC visibility
3083            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3084
3085            // Create processor with transaction context
3086            let processor = QueryProcessor::for_stores_with_transaction(
3087                Arc::clone(&active),
3088                self.active_write_store(),
3089                Arc::clone(&self.transaction_manager),
3090            )?;
3091
3092            // Apply transaction context if in a transaction
3093            let processor = if let Some(transaction_id) = transaction_id {
3094                processor.with_transaction_context(viewing_epoch, transaction_id)
3095            } else {
3096                processor
3097            };
3098
3099            processor.process(query, QueryLanguage::Gql, Some(&params))
3100        })
3101    }
3102
3103    /// Executes a GQL query with parameters.
3104    ///
3105    /// # Errors
3106    ///
3107    /// Returns an error if no query language is enabled.
3108    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3109    pub fn execute_with_params(
3110        &self,
3111        _query: &str,
3112        _params: std::collections::HashMap<String, Value>,
3113    ) -> Result<QueryResult> {
3114        Err(grafeo_common::utils::error::Error::Internal(
3115            "No query language enabled".to_string(),
3116        ))
3117    }
3118
3119    /// Executes a GQL query.
3120    ///
3121    /// # Errors
3122    ///
3123    /// Returns an error if no query language is enabled.
3124    #[cfg(not(any(feature = "gql", feature = "cypher")))]
3125    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
3126        Err(grafeo_common::utils::error::Error::Internal(
3127            "No query language enabled".to_string(),
3128        ))
3129    }
3130
3131    /// Executes a Cypher query.
3132    ///
3133    /// # Errors
3134    ///
3135    /// Returns an error if the query fails to parse or execute.
3136    #[cfg(feature = "cypher")]
3137    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
3138        use crate::query::{
3139            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
3140            translators::cypher,
3141        };
3142
3143        // Handle schema DDL and SHOW commands before the normal query path
3144        let translation = cypher::translate_full(query)?;
3145        match translation {
3146            #[cfg(feature = "lpg")]
3147            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
3148                use grafeo_common::utils::error::{
3149                    Error as GrafeoError, QueryError, QueryErrorKind,
3150                };
3151                self.require_permission(crate::auth::StatementKind::Admin)?;
3152                if *self.read_only_tx.lock() {
3153                    return Err(GrafeoError::Query(QueryError::new(
3154                        QueryErrorKind::Semantic,
3155                        "Cannot execute schema DDL in a read-only transaction",
3156                    )));
3157                }
3158                return self.execute_schema_command(cmd);
3159            }
3160            #[cfg(not(feature = "lpg"))]
3161            cypher::CypherTranslationResult::SchemaCommand(_) => {
3162                return Err(grafeo_common::utils::error::Error::Internal(
3163                    "Schema DDL requires the `lpg` feature".to_string(),
3164                ));
3165            }
3166            cypher::CypherTranslationResult::ShowIndexes => {
3167                return self.execute_show_indexes();
3168            }
3169            cypher::CypherTranslationResult::ShowConstraints => {
3170                return self.execute_show_constraints();
3171            }
3172            cypher::CypherTranslationResult::ShowCurrentGraphType => {
3173                return self.execute_show_current_graph_type();
3174            }
3175            cypher::CypherTranslationResult::Plan(_) => {
3176                // Fall through to normal execution below
3177            }
3178        }
3179
3180        #[cfg(not(target_arch = "wasm32"))]
3181        let start_time = std::time::Instant::now();
3182
3183        // Create cache key for this query
3184        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
3185
3186        // Try to get cached optimized plan
3187        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3188            cached_plan
3189        } else {
3190            // Parse and translate the query to a logical plan
3191            let logical_plan = cypher::translate(query)?;
3192
3193            // Semantic validation
3194            let mut binder = Binder::new();
3195            let _binding_context = binder.bind(&logical_plan)?;
3196
3197            // Optimize the plan
3198            let active = self.active_store();
3199            let optimizer = Optimizer::from_graph_store(&*active);
3200            let plan = optimizer.optimize(logical_plan)?;
3201
3202            // Cache the optimized plan
3203            self.query_cache.put_optimized(cache_key, plan.clone());
3204
3205            plan
3206        };
3207
3208        // Check role-based permission for mutations
3209        if optimized_plan.root.has_mutations() {
3210            self.require_permission(crate::auth::StatementKind::Write)?;
3211        }
3212
3213        // Resolve the active store for query execution
3214        let active = self.active_store();
3215
3216        // EXPLAIN
3217        if optimized_plan.explain {
3218            use crate::query::processor::{annotate_pushdown_hints, explain_result};
3219            let mut plan = optimized_plan;
3220            annotate_pushdown_hints(&mut plan.root, active.as_ref());
3221            return Ok(explain_result(&plan));
3222        }
3223
3224        // PROFILE
3225        if optimized_plan.profile {
3226            let has_mutations = optimized_plan.root.has_mutations();
3227            return self.with_auto_commit(has_mutations, || {
3228                let (viewing_epoch, transaction_id) = self.get_transaction_context();
3229                let planner = self.create_planner_for_store(
3230                    Arc::clone(&active),
3231                    viewing_epoch,
3232                    transaction_id,
3233                );
3234                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
3235
3236                let executor = self.make_executor(physical_plan.columns.clone());
3237                let _result = executor.execute(physical_plan.operator.as_mut())?;
3238
3239                let total_time_ms;
3240                #[cfg(not(target_arch = "wasm32"))]
3241                {
3242                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
3243                }
3244                #[cfg(target_arch = "wasm32")]
3245                {
3246                    total_time_ms = 0.0;
3247                }
3248
3249                let profile_tree = crate::query::profile::build_profile_tree(
3250                    &optimized_plan.root,
3251                    &mut entries.into_iter(),
3252                );
3253                Ok(crate::query::profile::profile_result(
3254                    &profile_tree,
3255                    total_time_ms,
3256                ))
3257            });
3258        }
3259
3260        let has_mutations = optimized_plan.root.has_mutations();
3261
3262        let result = self.with_auto_commit(has_mutations, || {
3263            // Get transaction context for MVCC visibility
3264            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3265
3266            // Convert to physical plan with transaction context
3267            let planner =
3268                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3269            let mut physical_plan = planner.plan(&optimized_plan)?;
3270
3271            // Execute the plan
3272            let executor = self.make_executor(physical_plan.columns.clone());
3273            executor.execute(physical_plan.operator.as_mut())
3274        });
3275
3276        #[cfg(feature = "metrics")]
3277        {
3278            #[cfg(not(target_arch = "wasm32"))]
3279            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3280            #[cfg(target_arch = "wasm32")]
3281            let elapsed_ms = None;
3282            self.record_query_metrics("cypher", elapsed_ms, &result);
3283        }
3284
3285        result
3286    }
3287
3288    /// Executes a Gremlin query.
3289    ///
3290    /// # Errors
3291    ///
3292    /// Returns an error if the query fails to parse or execute.
3293    ///
3294    /// # Examples
3295    ///
3296    /// ```no_run
3297    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3298    /// use grafeo_engine::GrafeoDB;
3299    ///
3300    /// let db = GrafeoDB::new_in_memory();
3301    /// let session = db.session();
3302    ///
3303    /// // Create some nodes first
3304    /// session.create_node(&["Person"]);
3305    ///
3306    /// // Query using Gremlin
3307    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
3308    /// # Ok(())
3309    /// # }
3310    /// ```
3311    #[cfg(feature = "gremlin")]
3312    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3313        use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3314
3315        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3316        let start_time = Instant::now();
3317
3318        // Parse and translate the query to a logical plan
3319        let logical_plan = gremlin::translate(query)?;
3320
3321        // Semantic validation
3322        let mut binder = Binder::new();
3323        let _binding_context = binder.bind(&logical_plan)?;
3324
3325        // Optimize the plan
3326        let active = self.active_store();
3327        let optimizer = Optimizer::from_graph_store(&*active);
3328        let optimized_plan = optimizer.optimize(logical_plan)?;
3329
3330        let has_mutations = optimized_plan.root.has_mutations();
3331        if has_mutations {
3332            self.require_permission(crate::auth::StatementKind::Write)?;
3333        }
3334
3335        let result = self.with_auto_commit(has_mutations, || {
3336            // Get transaction context for MVCC visibility
3337            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3338
3339            // Convert to physical plan with transaction context
3340            let planner =
3341                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3342            let mut physical_plan = planner.plan(&optimized_plan)?;
3343
3344            // Execute the plan
3345            let executor = self.make_executor(physical_plan.columns.clone());
3346            executor.execute(physical_plan.operator.as_mut())
3347        });
3348
3349        #[cfg(feature = "metrics")]
3350        {
3351            #[cfg(not(target_arch = "wasm32"))]
3352            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3353            #[cfg(target_arch = "wasm32")]
3354            let elapsed_ms = None;
3355            self.record_query_metrics("gremlin", elapsed_ms, &result);
3356        }
3357
3358        result
3359    }
3360
3361    /// Executes a Gremlin query with parameters.
3362    ///
3363    /// # Errors
3364    ///
3365    /// Returns an error if the query fails to parse or execute.
3366    #[cfg(feature = "gremlin")]
3367    pub fn execute_gremlin_with_params(
3368        &self,
3369        query: &str,
3370        params: std::collections::HashMap<String, Value>,
3371    ) -> Result<QueryResult> {
3372        use crate::query::{
3373            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3374            translators::gremlin,
3375        };
3376
3377        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3378        let start_time = Instant::now();
3379
3380        // Parse and translate the query to a logical plan
3381        let mut logical_plan = gremlin::translate(query)?;
3382
3383        // Substitute parameters
3384        substitute_params(&mut logical_plan, &params)?;
3385
3386        // Semantic validation
3387        let mut binder = Binder::new();
3388        let _binding_context = binder.bind(&logical_plan)?;
3389
3390        // Optimize the plan
3391        let active = self.active_store();
3392        let optimizer = Optimizer::from_graph_store(&*active);
3393        let optimized_plan = optimizer.optimize(logical_plan)?;
3394
3395        let has_mutations = optimized_plan.root.has_mutations();
3396        if has_mutations {
3397            self.require_permission(crate::auth::StatementKind::Write)?;
3398        }
3399
3400        let result = self.with_auto_commit(has_mutations, || {
3401            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3402            let planner =
3403                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3404            let mut physical_plan = planner.plan(&optimized_plan)?;
3405            let executor = self.make_executor(physical_plan.columns.clone());
3406            executor.execute(physical_plan.operator.as_mut())
3407        });
3408
3409        #[cfg(feature = "metrics")]
3410        {
3411            #[cfg(not(target_arch = "wasm32"))]
3412            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3413            #[cfg(target_arch = "wasm32")]
3414            let elapsed_ms = None;
3415            self.record_query_metrics("gremlin", elapsed_ms, &result);
3416        }
3417
3418        result
3419    }
3420
3421    /// Executes a GraphQL query against the LPG store.
3422    ///
3423    /// # Errors
3424    ///
3425    /// Returns an error if the query fails to parse or execute.
3426    ///
3427    /// # Examples
3428    ///
3429    /// ```no_run
3430    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3431    /// use grafeo_engine::GrafeoDB;
3432    ///
3433    /// let db = GrafeoDB::new_in_memory();
3434    /// let session = db.session();
3435    ///
3436    /// // Create some nodes first
3437    /// session.create_node(&["User"]);
3438    ///
3439    /// // Query using GraphQL
3440    /// let result = session.execute_graphql("query { user { id name } }")?;
3441    /// # Ok(())
3442    /// # }
3443    /// ```
3444    #[cfg(feature = "graphql")]
3445    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3446        use crate::query::{
3447            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3448            translators::graphql,
3449        };
3450
3451        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3452        let start_time = Instant::now();
3453
3454        let mut logical_plan = graphql::translate(query)?;
3455
3456        // Substitute default parameter values from variable declarations
3457        if !logical_plan.default_params.is_empty() {
3458            let defaults = logical_plan.default_params.clone();
3459            substitute_params(&mut logical_plan, &defaults)?;
3460        }
3461
3462        let mut binder = Binder::new();
3463        let _binding_context = binder.bind(&logical_plan)?;
3464
3465        let active = self.active_store();
3466        let optimizer = Optimizer::from_graph_store(&*active);
3467        let optimized_plan = optimizer.optimize(logical_plan)?;
3468        let has_mutations = optimized_plan.root.has_mutations();
3469        if has_mutations {
3470            self.require_permission(crate::auth::StatementKind::Write)?;
3471        }
3472
3473        let result = self.with_auto_commit(has_mutations, || {
3474            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3475            let planner =
3476                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3477            let mut physical_plan = planner.plan(&optimized_plan)?;
3478            let executor = self.make_executor(physical_plan.columns.clone());
3479            executor.execute(physical_plan.operator.as_mut())
3480        });
3481
3482        #[cfg(feature = "metrics")]
3483        {
3484            #[cfg(not(target_arch = "wasm32"))]
3485            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3486            #[cfg(target_arch = "wasm32")]
3487            let elapsed_ms = None;
3488            self.record_query_metrics("graphql", elapsed_ms, &result);
3489        }
3490
3491        result
3492    }
3493
3494    /// Executes a GraphQL query with parameters.
3495    ///
3496    /// # Errors
3497    ///
3498    /// Returns an error if the query fails to parse or execute.
3499    #[cfg(feature = "graphql")]
3500    pub fn execute_graphql_with_params(
3501        &self,
3502        query: &str,
3503        params: std::collections::HashMap<String, Value>,
3504    ) -> Result<QueryResult> {
3505        use crate::query::{
3506            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3507            translators::graphql,
3508        };
3509
3510        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3511        let start_time = Instant::now();
3512
3513        // Parse and translate the query to a logical plan
3514        let mut logical_plan = graphql::translate(query)?;
3515
3516        // Merge default params with caller-supplied params
3517        if !logical_plan.default_params.is_empty() {
3518            let mut merged = logical_plan.default_params.clone();
3519            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3520            substitute_params(&mut logical_plan, &merged)?;
3521        } else {
3522            substitute_params(&mut logical_plan, &params)?;
3523        }
3524
3525        // Semantic validation
3526        let mut binder = Binder::new();
3527        let _binding_context = binder.bind(&logical_plan)?;
3528
3529        // Optimize the plan
3530        let active = self.active_store();
3531        let optimizer = Optimizer::from_graph_store(&*active);
3532        let optimized_plan = optimizer.optimize(logical_plan)?;
3533
3534        let has_mutations = optimized_plan.root.has_mutations();
3535        if has_mutations {
3536            self.require_permission(crate::auth::StatementKind::Write)?;
3537        }
3538
3539        let result = self.with_auto_commit(has_mutations, || {
3540            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3541            let planner =
3542                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3543            let mut physical_plan = planner.plan(&optimized_plan)?;
3544            let executor = self.make_executor(physical_plan.columns.clone());
3545            executor.execute(physical_plan.operator.as_mut())
3546        });
3547
3548        #[cfg(feature = "metrics")]
3549        {
3550            #[cfg(not(target_arch = "wasm32"))]
3551            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3552            #[cfg(target_arch = "wasm32")]
3553            let elapsed_ms = None;
3554            self.record_query_metrics("graphql", elapsed_ms, &result);
3555        }
3556
3557        result
3558    }
3559
3560    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3561    ///
3562    /// # Errors
3563    ///
3564    /// Returns an error if the query fails to parse or execute.
3565    ///
3566    /// # Examples
3567    ///
3568    /// ```no_run
3569    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3570    /// use grafeo_engine::GrafeoDB;
3571    ///
3572    /// let db = GrafeoDB::new_in_memory();
3573    /// let session = db.session();
3574    ///
3575    /// let result = session.execute_sql(
3576    ///     "SELECT * FROM GRAPH_TABLE (
3577    ///         MATCH (n:Person)
3578    ///         COLUMNS (n.name AS name)
3579    ///     )"
3580    /// )?;
3581    /// # Ok(())
3582    /// # }
3583    /// ```
3584    #[cfg(feature = "sql-pgq")]
3585    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3586        use crate::query::{
3587            binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3588            processor::QueryLanguage, translators::sql_pgq,
3589        };
3590
3591        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3592        let start_time = Instant::now();
3593
3594        // Parse and translate (always needed to check for DDL)
3595        let logical_plan = sql_pgq::translate(query)?;
3596
3597        // Handle DDL statements directly (they don't go through the query pipeline)
3598        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3599            self.require_permission(crate::auth::StatementKind::Admin)?;
3600            return Ok(QueryResult {
3601                columns: vec!["status".into()],
3602                column_types: vec![grafeo_common::types::LogicalType::String],
3603                rows: vec![vec![Value::from(format!(
3604                    "Property graph '{}' created ({} node tables, {} edge tables)",
3605                    cpg.name,
3606                    cpg.node_tables.len(),
3607                    cpg.edge_tables.len()
3608                ))]],
3609                execution_time_ms: None,
3610                rows_scanned: None,
3611                status_message: None,
3612                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3613            });
3614        }
3615
3616        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3617
3618        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3619            cached_plan
3620        } else {
3621            let mut binder = Binder::new();
3622            let _binding_context = binder.bind(&logical_plan)?;
3623            let active = self.active_store();
3624            let optimizer = Optimizer::from_graph_store(&*active);
3625            let plan = optimizer.optimize(logical_plan)?;
3626            self.query_cache.put_optimized(cache_key, plan.clone());
3627            plan
3628        };
3629
3630        let active = self.active_store();
3631        let has_mutations = optimized_plan.root.has_mutations();
3632        if has_mutations {
3633            self.require_permission(crate::auth::StatementKind::Write)?;
3634        }
3635
3636        let result = self.with_auto_commit(has_mutations, || {
3637            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3638            let planner =
3639                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3640            let mut physical_plan = planner.plan(&optimized_plan)?;
3641            let executor = self.make_executor(physical_plan.columns.clone());
3642            executor.execute(physical_plan.operator.as_mut())
3643        });
3644
3645        #[cfg(feature = "metrics")]
3646        {
3647            #[cfg(not(target_arch = "wasm32"))]
3648            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3649            #[cfg(target_arch = "wasm32")]
3650            let elapsed_ms = None;
3651            self.record_query_metrics("sql", elapsed_ms, &result);
3652        }
3653
3654        result
3655    }
3656
3657    /// Executes a SQL/PGQ query with parameters.
3658    ///
3659    /// # Errors
3660    ///
3661    /// Returns an error if the query fails to parse or execute.
3662    #[cfg(feature = "sql-pgq")]
3663    pub fn execute_sql_with_params(
3664        &self,
3665        query: &str,
3666        params: std::collections::HashMap<String, Value>,
3667    ) -> Result<QueryResult> {
3668        use crate::query::processor::{QueryLanguage, QueryProcessor};
3669
3670        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3671        let start_time = Instant::now();
3672
3673        let has_mutations = if self.identity.can_write() {
3674            Self::query_looks_like_mutation(query)
3675        } else {
3676            use crate::query::translators::sql_pgq;
3677            match sql_pgq::translate(query) {
3678                Ok(plan) if plan.root.has_mutations() => {
3679                    self.require_permission(crate::auth::StatementKind::Write)?;
3680                    true
3681                }
3682                Ok(_) => false,
3683                Err(_) => Self::query_looks_like_mutation(query),
3684            }
3685        };
3686        if has_mutations {
3687            self.require_permission(crate::auth::StatementKind::Write)?;
3688        }
3689        let active = self.active_store();
3690
3691        let result = self.with_auto_commit(has_mutations, || {
3692            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3693            let processor = QueryProcessor::for_stores_with_transaction(
3694                Arc::clone(&active),
3695                self.active_write_store(),
3696                Arc::clone(&self.transaction_manager),
3697            )?;
3698            let processor = if let Some(transaction_id) = transaction_id {
3699                processor.with_transaction_context(viewing_epoch, transaction_id)
3700            } else {
3701                processor
3702            };
3703            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3704        });
3705
3706        #[cfg(feature = "metrics")]
3707        {
3708            #[cfg(not(target_arch = "wasm32"))]
3709            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3710            #[cfg(target_arch = "wasm32")]
3711            let elapsed_ms = None;
3712            self.record_query_metrics("sql", elapsed_ms, &result);
3713        }
3714
3715        result
3716    }
3717
3718    /// Executes a query in the specified language by name.
3719    ///
3720    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3721    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3722    ///
3723    /// # Errors
3724    ///
3725    /// Returns an error if the language is unknown/disabled or the query fails.
3726    pub fn execute_language(
3727        &self,
3728        query: &str,
3729        language: &str,
3730        params: Option<std::collections::HashMap<String, Value>>,
3731    ) -> Result<QueryResult> {
3732        let _span = grafeo_info_span!(
3733            "grafeo::session::execute",
3734            language,
3735            query_len = query.len(),
3736        );
3737        match language {
3738            "gql" => {
3739                if let Some(p) = params {
3740                    self.execute_with_params(query, p)
3741                } else {
3742                    self.execute(query)
3743                }
3744            }
3745            #[cfg(feature = "cypher")]
3746            "cypher" => {
3747                if let Some(p) = params {
3748                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3749
3750                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3751                    let start_time = Instant::now();
3752
3753                    let has_mutations = if self.identity.can_write() {
3754                        Self::query_looks_like_mutation(query)
3755                    } else {
3756                        use crate::query::translators::cypher;
3757                        match cypher::translate(query) {
3758                            Ok(plan) if plan.root.has_mutations() => {
3759                                self.require_permission(crate::auth::StatementKind::Write)?;
3760                                true
3761                            }
3762                            Ok(_) => false,
3763                            Err(_) => Self::query_looks_like_mutation(query),
3764                        }
3765                    };
3766                    let active = self.active_store();
3767                    let result = self.with_auto_commit(has_mutations, || {
3768                        let processor = QueryProcessor::for_stores_with_transaction(
3769                            Arc::clone(&active),
3770                            self.active_write_store(),
3771                            Arc::clone(&self.transaction_manager),
3772                        )?;
3773                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3774                        let processor = if let Some(transaction_id) = transaction_id {
3775                            processor.with_transaction_context(viewing_epoch, transaction_id)
3776                        } else {
3777                            processor
3778                        };
3779                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3780                    });
3781
3782                    #[cfg(feature = "metrics")]
3783                    {
3784                        #[cfg(not(target_arch = "wasm32"))]
3785                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3786                        #[cfg(target_arch = "wasm32")]
3787                        let elapsed_ms = None;
3788                        self.record_query_metrics("cypher", elapsed_ms, &result);
3789                    }
3790
3791                    result
3792                } else {
3793                    self.execute_cypher(query)
3794                }
3795            }
3796            #[cfg(feature = "gremlin")]
3797            "gremlin" => {
3798                if let Some(p) = params {
3799                    self.execute_gremlin_with_params(query, p)
3800                } else {
3801                    self.execute_gremlin(query)
3802                }
3803            }
3804            #[cfg(feature = "graphql")]
3805            "graphql" => {
3806                if let Some(p) = params {
3807                    self.execute_graphql_with_params(query, p)
3808                } else {
3809                    self.execute_graphql(query)
3810                }
3811            }
3812            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3813            "graphql-rdf" => {
3814                if let Some(p) = params {
3815                    self.execute_graphql_rdf_with_params(query, p)
3816                } else {
3817                    self.execute_graphql_rdf(query)
3818                }
3819            }
3820            #[cfg(feature = "sql-pgq")]
3821            "sql" | "sql-pgq" => {
3822                if let Some(p) = params {
3823                    self.execute_sql_with_params(query, p)
3824                } else {
3825                    self.execute_sql(query)
3826                }
3827            }
3828            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3829            "sparql" => {
3830                if let Some(p) = params {
3831                    self.execute_sparql_with_params(query, p)
3832                } else {
3833                    self.execute_sparql(query)
3834                }
3835            }
3836            other => Err(grafeo_common::utils::error::Error::Query(
3837                grafeo_common::utils::error::QueryError::new(
3838                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3839                    format!("Unknown query language: '{other}'"),
3840                ),
3841            )),
3842        }
3843    }
3844
3845    /// Begins a new transaction.
3846    ///
3847    /// # Errors
3848    ///
3849    /// Returns an error if a transaction is already active.
3850    ///
3851    /// # Examples
3852    ///
3853    /// ```no_run
3854    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3855    /// use grafeo_engine::GrafeoDB;
3856    ///
3857    /// let db = GrafeoDB::new_in_memory();
3858    /// let mut session = db.session();
3859    ///
3860    /// session.begin_transaction()?;
3861    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3862    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3863    /// session.commit()?; // Both inserts committed atomically
3864    /// # Ok(())
3865    /// # }
3866    /// ```
3867    /// Clears all cached query plans.
3868    ///
3869    /// The plan cache is shared across all sessions on the same database,
3870    /// so clearing from one session affects all sessions.
3871    pub fn clear_plan_cache(&self) {
3872        self.query_cache.clear();
3873    }
3874
3875    /// Begins a new transaction on this session.
3876    ///
3877    /// Uses the default isolation level (`SnapshotIsolation`).
3878    ///
3879    /// # Errors
3880    ///
3881    /// Returns an error if a transaction is already active.
3882    #[cfg(feature = "lpg")]
3883    pub fn begin_transaction(&mut self) -> Result<()> {
3884        self.begin_transaction_inner(false, None)
3885    }
3886
3887    /// Begins a transaction with a specific isolation level.
3888    ///
3889    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3890    ///
3891    /// # Errors
3892    ///
3893    /// Returns an error if a transaction is already active.
3894    #[cfg(feature = "lpg")]
3895    pub fn begin_transaction_with_isolation(
3896        &mut self,
3897        isolation_level: crate::transaction::IsolationLevel,
3898    ) -> Result<()> {
3899        self.begin_transaction_inner(false, Some(isolation_level))
3900    }
3901
3902    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3903    #[cfg(feature = "lpg")]
3904    fn begin_transaction_inner(
3905        &self,
3906        read_only: bool,
3907        isolation_level: Option<crate::transaction::IsolationLevel>,
3908    ) -> Result<()> {
3909        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3910        let mut current = self.current_transaction.lock();
3911        if current.is_some() {
3912            // Nested transaction: create an auto-savepoint instead of a new tx.
3913            drop(current);
3914            let mut depth = self.transaction_nesting_depth.lock();
3915            *depth += 1;
3916            let sp_name = format!("_nested_tx_{}", *depth);
3917            self.savepoint(&sp_name)?;
3918            return Ok(());
3919        }
3920
3921        let active = self.active_lpg_store();
3922        self.transaction_start_node_count
3923            .store(active.node_count(), Ordering::Relaxed);
3924        self.transaction_start_edge_count
3925            .store(active.edge_count(), Ordering::Relaxed);
3926        let transaction_id = if let Some(level) = isolation_level {
3927            self.transaction_manager.begin_with_isolation(level)
3928        } else {
3929            self.transaction_manager.begin()
3930        };
3931        *current = Some(transaction_id);
3932        *self.read_only_tx.lock() = read_only || self.db_read_only;
3933
3934        // Record the initial graph as "touched" for cross-graph atomicity.
3935        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3936        let key = self.active_graph_storage_key();
3937        let mut touched = self.touched_graphs.lock();
3938        touched.clear();
3939        touched.push(key);
3940
3941        #[cfg(feature = "metrics")]
3942        {
3943            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3944            #[cfg(not(target_arch = "wasm32"))]
3945            {
3946                *self.tx_start_time.lock() = Some(Instant::now());
3947            }
3948        }
3949
3950        Ok(())
3951    }
3952
3953    /// Commits the current transaction.
3954    ///
3955    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3956    ///
3957    /// # Errors
3958    ///
3959    /// Returns an error if no transaction is active.
3960    #[cfg(feature = "lpg")]
3961    pub fn commit(&mut self) -> Result<()> {
3962        self.commit_inner()
3963    }
3964
3965    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3966    #[cfg(feature = "lpg")]
3967    fn commit_inner(&self) -> Result<()> {
3968        let _span = grafeo_debug_span!("grafeo::tx::commit");
3969
3970        #[cfg(feature = "testing-statement-injection")]
3971        if let Err(e) = grafeo_common::testing::statement_failure::maybe_fail_commit() {
3972            // Commit fails before any state is finalized. Treat it like any
3973            // other pre-prepare commit failure and auto-rollback so the
3974            // session returns to a clean, consistent state (matches real
3975            // DB semantics: commit failure implies the transaction is
3976            // aborted, not left in-flight).
3977            let _ = self.rollback_inner();
3978            return Err(grafeo_common::utils::error::Error::Internal(format!(
3979                "injected commit failure: {e}"
3980            )));
3981        }
3982
3983        self.check_no_active_streams("commit")?;
3984        // Nested transaction: release the auto-savepoint (changes are preserved).
3985        {
3986            let mut depth = self.transaction_nesting_depth.lock();
3987            if *depth > 0 {
3988                let sp_name = format!("_nested_tx_{depth}");
3989                *depth -= 1;
3990                drop(depth);
3991                return self.release_savepoint(&sp_name);
3992            }
3993        }
3994
3995        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3996            grafeo_common::utils::error::Error::Transaction(
3997                grafeo_common::utils::error::TransactionError::InvalidState(
3998                    "No active transaction".to_string(),
3999                ),
4000            )
4001        })?;
4002
4003        // Validate the transaction first (conflict detection) before committing data.
4004        // If this fails, we rollback the data changes instead of making them permanent.
4005        //
4006        // Take ownership of the touched graphs in one lock acquisition. Since
4007        // current_transaction was .take()'d above, no concurrent thread can call
4008        // track_graph_touch() for this transaction (it checks current_transaction
4009        // first), so this is safe.
4010        let touched = std::mem::take(&mut *self.touched_graphs.lock());
4011        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
4012            Ok(epoch) => epoch,
4013            Err(e) => {
4014                // Conflict detected: rollback the data changes
4015                for graph_name in &touched {
4016                    let store = self.resolve_store(graph_name);
4017                    store.rollback_transaction_properties(transaction_id);
4018                }
4019                #[cfg(feature = "triple-store")]
4020                self.rollback_rdf_transaction(transaction_id);
4021                // Discard buffered CDC events on conflict rollback
4022                #[cfg(feature = "cdc")]
4023                if let Some(ref pending) = self.cdc_pending_events {
4024                    pending.lock().clear();
4025                }
4026                *self.read_only_tx.lock() = self.db_read_only;
4027                self.savepoints.lock().clear();
4028                self.touched_graphs.lock().clear();
4029                #[cfg(feature = "metrics")]
4030                {
4031                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
4032                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
4033                    #[cfg(not(target_arch = "wasm32"))]
4034                    if let Some(start) = self.tx_start_time.lock().take() {
4035                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4036                        crate::metrics::record_metric!(
4037                            self.metrics,
4038                            tx_duration,
4039                            observe duration_ms
4040                        );
4041                    }
4042                }
4043                return Err(e);
4044            }
4045        };
4046
4047        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
4048        for graph_name in &touched {
4049            let store = self.resolve_store(graph_name);
4050            store.finalize_version_epochs(transaction_id, commit_epoch);
4051        }
4052
4053        // Commit succeeded: discard undo logs (make changes permanent)
4054        #[cfg(feature = "triple-store")]
4055        self.commit_rdf_transaction(transaction_id);
4056
4057        for graph_name in &touched {
4058            let store = self.resolve_store(graph_name);
4059            store.commit_transaction_properties(transaction_id);
4060        }
4061
4062        // Flush buffered CDC events now that the transaction is committed.
4063        // All buffered events have PENDING epoch; assign the real commit_epoch.
4064        // Uses record_batch to acquire the write lock once per commit.
4065        #[cfg(feature = "cdc")]
4066        if let Some(ref pending) = self.cdc_pending_events {
4067            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
4068            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
4069                e.epoch = commit_epoch;
4070                e
4071            }));
4072        }
4073
4074        // Log transaction commit and epoch advance to WAL so that crash
4075        // recovery can identify committed transactions and their epoch
4076        // boundaries. Without these markers, WAL recovery discards all
4077        // records as uncommitted (fixes #252 for the crash scenario).
4078        #[cfg(feature = "wal")]
4079        if let Some(ref wal) = self.wal {
4080            use grafeo_storage::wal::WalRecord;
4081            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
4082                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
4083            }
4084            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
4085                epoch: commit_epoch,
4086            }) {
4087                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
4088            }
4089        }
4090
4091        // Sync epoch for all touched graphs so that convenience lookups
4092        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
4093        let current_epoch = self.transaction_manager.current_epoch();
4094        for graph_name in &touched {
4095            let store = self.resolve_store(graph_name);
4096            store.sync_epoch(current_epoch);
4097        }
4098
4099        // Reset read-only flag and clear savepoints.
4100        // touched_graphs was already emptied by mem::take above.
4101        *self.read_only_tx.lock() = self.db_read_only;
4102        self.savepoints.lock().clear();
4103
4104        // Auto-GC: periodically prune old MVCC versions
4105        if self.gc_interval > 0 {
4106            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
4107            if count.is_multiple_of(self.gc_interval) {
4108                #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
4109                let gc_start = std::time::Instant::now();
4110
4111                let min_epoch = self.transaction_manager.min_active_epoch();
4112                for graph_name in &touched {
4113                    let store = self.resolve_store(graph_name);
4114                    store.gc_versions(min_epoch);
4115                }
4116                self.transaction_manager.gc();
4117
4118                #[cfg(feature = "metrics")]
4119                {
4120                    crate::metrics::record_metric!(self.metrics, gc_runs, inc);
4121                    #[cfg(not(target_arch = "wasm32"))]
4122                    {
4123                        let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
4124                        crate::metrics::record_metric!(
4125                            self.metrics,
4126                            gc_duration,
4127                            observe gc_duration_ms
4128                        );
4129                    }
4130                }
4131            }
4132        }
4133
4134        #[cfg(feature = "metrics")]
4135        {
4136            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4137            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
4138            #[cfg(not(target_arch = "wasm32"))]
4139            if let Some(start) = self.tx_start_time.lock().take() {
4140                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4141                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4142            }
4143        }
4144
4145        Ok(())
4146    }
4147
4148    /// Aborts the current transaction.
4149    ///
4150    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
4151    ///
4152    /// # Errors
4153    ///
4154    /// Returns an error if no transaction is active.
4155    ///
4156    /// # Examples
4157    ///
4158    /// ```no_run
4159    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4160    /// use grafeo_engine::GrafeoDB;
4161    ///
4162    /// let db = GrafeoDB::new_in_memory();
4163    /// let mut session = db.session();
4164    ///
4165    /// session.begin_transaction()?;
4166    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4167    /// session.rollback()?; // Insert is discarded
4168    /// # Ok(())
4169    /// # }
4170    /// ```
4171    #[cfg(feature = "lpg")]
4172    pub fn rollback(&mut self) -> Result<()> {
4173        self.rollback_inner()
4174    }
4175
4176    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
4177    #[cfg(feature = "lpg")]
4178    fn rollback_inner(&self) -> Result<()> {
4179        let _span = grafeo_debug_span!("grafeo::tx::rollback");
4180        self.check_no_active_streams("rollback")?;
4181        // Nested transaction: rollback to the auto-savepoint.
4182        {
4183            let mut depth = self.transaction_nesting_depth.lock();
4184            if *depth > 0 {
4185                let sp_name = format!("_nested_tx_{depth}");
4186                *depth -= 1;
4187                drop(depth);
4188                return self.rollback_to_savepoint(&sp_name);
4189            }
4190        }
4191
4192        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
4193            grafeo_common::utils::error::Error::Transaction(
4194                grafeo_common::utils::error::TransactionError::InvalidState(
4195                    "No active transaction".to_string(),
4196                ),
4197            )
4198        })?;
4199
4200        // Reset read-only flag
4201        *self.read_only_tx.lock() = self.db_read_only;
4202
4203        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
4204        let touched = self.touched_graphs.lock().clone();
4205        for graph_name in &touched {
4206            let store = self.resolve_store(graph_name);
4207            store.discard_uncommitted_versions(transaction_id);
4208        }
4209
4210        // Discard pending operations in the RDF store
4211        #[cfg(feature = "triple-store")]
4212        self.rollback_rdf_transaction(transaction_id);
4213
4214        // Discard buffered CDC events on rollback
4215        #[cfg(feature = "cdc")]
4216        if let Some(ref pending) = self.cdc_pending_events {
4217            pending.lock().clear();
4218        }
4219
4220        // Clear savepoints and touched graphs
4221        self.savepoints.lock().clear();
4222        self.touched_graphs.lock().clear();
4223
4224        // Mark transaction as aborted in the manager
4225        let result = self.transaction_manager.abort(transaction_id);
4226
4227        // Log transaction abort to WAL so recovery clears any data records
4228        // emitted during this transaction (e.g. via session-direct mutation
4229        // APIs). Without this marker, recovery's per-transaction buffer
4230        // would carry the rolled-back records into the next
4231        // `TransactionCommit` and resurrect them on reopen.
4232        #[cfg(feature = "wal")]
4233        if let Some(ref wal) = self.wal {
4234            use grafeo_storage::wal::WalRecord;
4235            if let Err(e) = wal.log(&WalRecord::TransactionAbort { transaction_id }) {
4236                grafeo_warn!("Failed to log transaction abort to WAL: {}", e);
4237            }
4238        }
4239
4240        #[cfg(feature = "metrics")]
4241        if result.is_ok() {
4242            crate::metrics::record_metric!(self.metrics, tx_active, dec);
4243            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
4244            #[cfg(not(target_arch = "wasm32"))]
4245            if let Some(start) = self.tx_start_time.lock().take() {
4246                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
4247                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
4248            }
4249        }
4250
4251        result
4252    }
4253
4254    /// Creates a named savepoint within the current transaction.
4255    ///
4256    /// The savepoint captures the current node/edge ID counters so that
4257    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
4258    /// entities created after this point.
4259    ///
4260    /// # Errors
4261    ///
4262    /// Returns an error if no transaction is active.
4263    #[cfg(feature = "lpg")]
4264    pub fn savepoint(&self, name: &str) -> Result<()> {
4265        let tx_id = self.current_transaction.lock().ok_or_else(|| {
4266            grafeo_common::utils::error::Error::Transaction(
4267                grafeo_common::utils::error::TransactionError::InvalidState(
4268                    "No active transaction".to_string(),
4269                ),
4270            )
4271        })?;
4272
4273        // Capture state for every graph touched so far.
4274        let touched = self.touched_graphs.lock().clone();
4275        let graph_snapshots: Vec<GraphSavepoint> = touched
4276            .iter()
4277            .map(|graph_name| {
4278                let store = self.resolve_store(graph_name);
4279                GraphSavepoint {
4280                    graph_name: graph_name.clone(),
4281                    next_node_id: store.peek_next_node_id(),
4282                    next_edge_id: store.peek_next_edge_id(),
4283                    undo_log_position: store.property_undo_log_position(tx_id),
4284                }
4285            })
4286            .collect();
4287
4288        self.savepoints.lock().push(SavepointState {
4289            name: name.to_string(),
4290            graph_snapshots,
4291            active_graph: self.current_graph.lock().clone(),
4292            #[cfg(feature = "cdc")]
4293            cdc_event_position: self
4294                .cdc_pending_events
4295                .as_ref()
4296                .map_or(0, |p| p.lock().len()),
4297        });
4298        Ok(())
4299    }
4300
4301    /// Rolls back to a named savepoint, undoing all writes made after it.
4302    ///
4303    /// The savepoint and any savepoints created after it are removed.
4304    /// Entities with IDs >= the savepoint snapshot are discarded.
4305    ///
4306    /// # Errors
4307    ///
4308    /// Returns an error if no transaction is active or the savepoint does not exist.
4309    #[cfg(feature = "lpg")]
4310    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4311        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4312            grafeo_common::utils::error::Error::Transaction(
4313                grafeo_common::utils::error::TransactionError::InvalidState(
4314                    "No active transaction".to_string(),
4315                ),
4316            )
4317        })?;
4318
4319        let mut savepoints = self.savepoints.lock();
4320
4321        // Find the savepoint by name (search from the end for nested savepoints)
4322        let pos = savepoints
4323            .iter()
4324            .rposition(|sp| sp.name == name)
4325            .ok_or_else(|| {
4326                grafeo_common::utils::error::Error::Transaction(
4327                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4328                        "Savepoint '{name}' not found"
4329                    )),
4330                )
4331            })?;
4332
4333        let sp_state = savepoints[pos].clone();
4334
4335        // Remove this savepoint and all later ones
4336        savepoints.truncate(pos);
4337        drop(savepoints);
4338
4339        // Roll back each graph that was captured in the savepoint.
4340        for gs in &sp_state.graph_snapshots {
4341            let store = self.resolve_store(&gs.graph_name);
4342
4343            // Replay property/label undo entries recorded after the savepoint
4344            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4345
4346            // Discard entities created after the savepoint
4347            let current_next_node = store.peek_next_node_id();
4348            let current_next_edge = store.peek_next_edge_id();
4349
4350            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4351                .map(NodeId::new)
4352                .collect();
4353            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4354                .map(EdgeId::new)
4355                .collect();
4356
4357            if !node_ids.is_empty() || !edge_ids.is_empty() {
4358                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4359            }
4360        }
4361
4362        // Also roll back any graphs that were touched AFTER the savepoint
4363        // but not captured in it. These need full discard since the savepoint
4364        // didn't include them.
4365        let touched = self.touched_graphs.lock().clone();
4366        for graph_name in &touched {
4367            let already_captured = sp_state
4368                .graph_snapshots
4369                .iter()
4370                .any(|gs| gs.graph_name == *graph_name);
4371            if !already_captured {
4372                let store = self.resolve_store(graph_name);
4373                store.discard_uncommitted_versions(transaction_id);
4374            }
4375        }
4376
4377        // Truncate CDC event buffer to the savepoint position.
4378        #[cfg(feature = "cdc")]
4379        if let Some(ref pending) = self.cdc_pending_events {
4380            pending.lock().truncate(sp_state.cdc_event_position);
4381        }
4382
4383        // Restore touched_graphs to only the graphs that were known at savepoint time.
4384        let mut touched = self.touched_graphs.lock();
4385        touched.clear();
4386        for gs in &sp_state.graph_snapshots {
4387            if !touched.contains(&gs.graph_name) {
4388                touched.push(gs.graph_name.clone());
4389            }
4390        }
4391
4392        Ok(())
4393    }
4394
4395    /// Releases (removes) a named savepoint without rolling back.
4396    ///
4397    /// # Errors
4398    ///
4399    /// Returns an error if no transaction is active or the savepoint does not exist.
4400    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4401        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4402            grafeo_common::utils::error::Error::Transaction(
4403                grafeo_common::utils::error::TransactionError::InvalidState(
4404                    "No active transaction".to_string(),
4405                ),
4406            )
4407        })?;
4408
4409        let mut savepoints = self.savepoints.lock();
4410        let pos = savepoints
4411            .iter()
4412            .rposition(|sp| sp.name == name)
4413            .ok_or_else(|| {
4414                grafeo_common::utils::error::Error::Transaction(
4415                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4416                        "Savepoint '{name}' not found"
4417                    )),
4418                )
4419            })?;
4420        savepoints.remove(pos);
4421        Ok(())
4422    }
4423
4424    /// Returns whether a transaction is active.
4425    #[must_use]
4426    pub fn in_transaction(&self) -> bool {
4427        self.current_transaction.lock().is_some()
4428    }
4429
4430    /// Returns the current transaction ID, if any.
4431    #[must_use]
4432    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4433        *self.current_transaction.lock()
4434    }
4435
4436    /// Returns a reference to the transaction manager.
4437    #[must_use]
4438    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4439        &self.transaction_manager
4440    }
4441
4442    /// Returns the store's current node count and the count at transaction start.
4443    #[cfg(feature = "lpg")]
4444    #[must_use]
4445    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4446        (
4447            self.transaction_start_node_count.load(Ordering::Relaxed),
4448            self.active_lpg_store().node_count(),
4449        )
4450    }
4451
4452    /// Returns the store's current edge count and the count at transaction start.
4453    #[cfg(feature = "lpg")]
4454    #[must_use]
4455    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4456        (
4457            self.transaction_start_edge_count.load(Ordering::Relaxed),
4458            self.active_lpg_store().edge_count(),
4459        )
4460    }
4461
4462    /// Prepares the current transaction for a two-phase commit.
4463    ///
4464    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4465    /// lets you inspect pending changes and attach metadata before finalizing.
4466    /// The mutable borrow prevents concurrent operations while the commit is
4467    /// pending.
4468    ///
4469    /// If the `PreparedCommit` is dropped without calling `commit()` or
4470    /// `abort()`, the transaction is automatically rolled back.
4471    ///
4472    /// # Errors
4473    ///
4474    /// Returns an error if no transaction is active.
4475    ///
4476    /// # Examples
4477    ///
4478    /// ```no_run
4479    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4480    /// use grafeo_engine::GrafeoDB;
4481    ///
4482    /// let db = GrafeoDB::new_in_memory();
4483    /// let mut session = db.session();
4484    ///
4485    /// session.begin_transaction()?;
4486    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4487    ///
4488    /// let mut prepared = session.prepare_commit()?;
4489    /// println!("Nodes written: {}", prepared.info().nodes_written);
4490    /// prepared.set_metadata("audit_user", "admin");
4491    /// prepared.commit()?;
4492    /// # Ok(())
4493    /// # }
4494    /// ```
4495    #[cfg(feature = "lpg")]
4496    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4497        crate::transaction::PreparedCommit::new(self)
4498    }
4499
4500    /// Sets auto-commit mode.
4501    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4502        self.auto_commit = auto_commit;
4503    }
4504
4505    /// Returns whether auto-commit is enabled.
4506    #[must_use]
4507    pub fn auto_commit(&self) -> bool {
4508        self.auto_commit
4509    }
4510
4511    /// Returns `true` if auto-commit should wrap this execution.
4512    ///
4513    /// Auto-commit kicks in when: the session is in auto-commit mode,
4514    /// no explicit transaction is active, and the query mutates data.
4515    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4516        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4517    }
4518
4519    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4520    /// returns `true`. On error the transaction is rolled back.
4521    #[cfg(feature = "lpg")]
4522    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4523    where
4524        F: FnOnce() -> Result<QueryResult>,
4525    {
4526        if self.needs_auto_commit(has_mutations) {
4527            self.begin_transaction_inner(false, None)?;
4528            match body() {
4529                Ok(result) => {
4530                    self.commit_inner()?;
4531                    Ok(result)
4532                }
4533                Err(e) => {
4534                    let _ = self.rollback_inner();
4535                    Err(e)
4536                }
4537            }
4538        } else {
4539            body()
4540        }
4541    }
4542
4543    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4544    #[cfg(not(feature = "lpg"))]
4545    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4546    where
4547        F: FnOnce() -> Result<QueryResult>,
4548    {
4549        body()
4550    }
4551
4552    /// Quick heuristic: returns `true` when the query text looks like it
4553    /// performs a mutation. Used by `_with_params` paths that go through the
4554    /// `QueryProcessor` (where the logical plan isn't available before
4555    /// execution). False negatives are harmless: the data just won't be
4556    /// auto-committed, which matches the prior behaviour.
4557    fn query_looks_like_mutation(query: &str) -> bool {
4558        let upper = query.to_ascii_uppercase();
4559        upper.contains("INSERT")
4560            || upper.contains("CREATE")
4561            || upper.contains("DELETE")
4562            || upper.contains("MERGE")
4563            || upper.contains("SET")
4564            || upper.contains("REMOVE")
4565            || upper.contains("DROP")
4566            || upper.contains("ALTER")
4567    }
4568
4569    /// Returns `Err(Transaction(InvalidState))` if any `ResultStream` is
4570    /// still outstanding for this session.
4571    #[cfg(feature = "lpg")]
4572    fn check_no_active_streams(&self, op: &str) -> Result<()> {
4573        if self.active_streams.load(Ordering::Acquire) > 0 {
4574            return Err(grafeo_common::utils::error::Error::Transaction(
4575                grafeo_common::utils::error::TransactionError::InvalidState(format!(
4576                    "Cannot {op} while streaming results are active; drop the stream first"
4577                )),
4578            ));
4579        }
4580        Ok(())
4581    }
4582
4583    /// Computes the wall-clock deadline for query execution.
4584    #[must_use]
4585    fn query_deadline(&self) -> Option<Instant> {
4586        #[cfg(not(target_arch = "wasm32"))]
4587        {
4588            self.query_timeout.map(|d| Instant::now() + d)
4589        }
4590        #[cfg(target_arch = "wasm32")]
4591        {
4592            let _ = &self.query_timeout;
4593            None
4594        }
4595    }
4596
4597    /// Creates an executor with deadline and timeout duration configured.
4598    fn make_executor(&self, columns: Vec<String>) -> Executor {
4599        Executor::with_columns(columns)
4600            .with_deadline(self.query_deadline())
4601            .with_timeout_duration(self.query_timeout)
4602    }
4603
4604    /// Creates a per-query `OperatorMemoryContext` for memory-aware spilling.
4605    ///
4606    /// Returns `None` if the buffer manager is not configured or has no spill path,
4607    /// in which case operators will use row-count fallback thresholds.
4608    #[cfg(feature = "spill")]
4609    fn make_operator_memory_context(
4610        &self,
4611    ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4612        let bm = self.buffer_manager.as_ref()?;
4613        let spill_path = bm.config().spill_path.as_ref()?;
4614        // Per-query isolation: create a unique subdirectory
4615        let query_id = self
4616            .commit_counter
4617            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4618        let query_dir = spill_path.join(format!("query_{query_id}"));
4619        let sm = std::sync::Arc::new(
4620            grafeo_core::execution::SpillManager::new(&query_dir)
4621                .ok()?
4622                .with_owned_dir(),
4623        );
4624        Some(grafeo_core::execution::OperatorMemoryContext::new(
4625            std::sync::Arc::clone(bm),
4626            sm,
4627        ))
4628    }
4629
4630    /// Checks that a property value does not exceed the configured size limit.
4631    fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4632        if let Some(limit) = self.max_property_size {
4633            let size = value.estimated_size_bytes();
4634            if size > limit {
4635                let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4636                    format!("{} MiB", limit / (1024 * 1024))
4637                } else if limit >= 1024 && limit % 1024 == 0 {
4638                    format!("{} KiB", limit / 1024)
4639                } else {
4640                    format!("{limit} bytes")
4641                };
4642                return Err(grafeo_common::utils::error::Error::Query(
4643                    grafeo_common::utils::error::QueryError::new(
4644                        grafeo_common::utils::error::QueryErrorKind::Execution,
4645                        format!(
4646                            "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4647                        ),
4648                    )
4649                    .with_hint(
4650                        "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4651                    ),
4652                ));
4653            }
4654        }
4655        Ok(())
4656    }
4657
4658    /// Records query metrics for any language.
4659    ///
4660    /// Called after query execution to update counters, latency histogram,
4661    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4662    /// where `Instant` is unavailable.
4663    #[cfg(feature = "metrics")]
4664    fn record_query_metrics(
4665        &self,
4666        language: &str,
4667        elapsed_ms: Option<f64>,
4668        result: &Result<crate::database::QueryResult>,
4669    ) {
4670        use crate::metrics::record_metric;
4671
4672        record_metric!(self.metrics, query_count, inc);
4673        if let Some(ref reg) = self.metrics {
4674            reg.query_count_by_language.increment(language);
4675        }
4676        if let Some(ms) = elapsed_ms {
4677            record_metric!(self.metrics, query_latency, observe ms);
4678        }
4679        match result {
4680            Ok(r) => {
4681                let returned = r.rows.len() as u64;
4682                record_metric!(self.metrics, rows_returned, add returned);
4683                if let Some(scanned) = r.rows_scanned {
4684                    record_metric!(self.metrics, rows_scanned, add scanned);
4685                }
4686            }
4687            Err(e) => {
4688                record_metric!(self.metrics, query_errors, inc);
4689                // Detect timeout errors
4690                let msg = e.to_string();
4691                if msg.contains("exceeded timeout") {
4692                    record_metric!(self.metrics, query_timeouts, inc);
4693                }
4694            }
4695        }
4696    }
4697
4698    /// Evaluates a simple integer literal from a session parameter expression.
4699    #[cfg(feature = "gql")]
4700    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4701        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4702        match expr {
4703            Expression::Literal(Literal::Integer(n)) => Some(*n),
4704            _ => None,
4705        }
4706    }
4707
4708    /// Returns the current transaction context for MVCC visibility.
4709    ///
4710    /// Returns `(viewing_epoch, transaction_id)` where:
4711    /// - `viewing_epoch` is the epoch at which to check version visibility
4712    /// - `transaction_id` is the current transaction ID (if in a transaction)
4713    #[must_use]
4714    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4715        // Time-travel override takes precedence (read-only, no tx context)
4716        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4717            return (epoch, None);
4718        }
4719
4720        if let Some(transaction_id) = *self.current_transaction.lock() {
4721            // In a transaction: use the transaction's start epoch
4722            let epoch = self
4723                .transaction_manager
4724                .start_epoch(transaction_id)
4725                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4726            (epoch, Some(transaction_id))
4727        } else {
4728            // No transaction: use current epoch
4729            (self.transaction_manager.current_epoch(), None)
4730        }
4731    }
4732
4733    /// Creates a planner with transaction context and constraint validator.
4734    ///
4735    /// The `store` parameter is the graph store to plan against (use
4736    /// `self.active_store()` for graph-aware execution).
4737    fn create_planner_for_store(
4738        &self,
4739        store: Arc<dyn GraphStoreSearch>,
4740        viewing_epoch: EpochId,
4741        transaction_id: Option<TransactionId>,
4742    ) -> crate::query::Planner {
4743        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4744    }
4745
4746    fn create_planner_for_store_with_read_only(
4747        &self,
4748        store: Arc<dyn GraphStoreSearch>,
4749        viewing_epoch: EpochId,
4750        transaction_id: Option<TransactionId>,
4751        read_only: bool,
4752    ) -> crate::query::Planner {
4753        use crate::query::Planner;
4754        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4755
4756        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4757        let info_store = Arc::clone(&store);
4758        let schema_store = Arc::clone(&store);
4759
4760        let session_context = SessionContext {
4761            current_schema: self.current_schema(),
4762            current_graph: self.current_graph(),
4763            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4764            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4765        };
4766
4767        let write_store = self.active_write_store();
4768
4769        let mut planner = Planner::with_context(
4770            Arc::clone(&store),
4771            write_store,
4772            Arc::clone(&self.transaction_manager),
4773            transaction_id,
4774            viewing_epoch,
4775        )
4776        .with_factorized_execution(self.factorized_execution)
4777        .with_catalog(Arc::clone(&self.catalog))
4778        .with_session_context(session_context)
4779        .with_read_only(read_only);
4780
4781        // Attach the LPG store so CALL grafeo.search.* procedures can reach
4782        // HNSW / BM25 indexes. Skip when the session is backed by an external
4783        // store — `self.store` is an empty placeholder in that case and would
4784        // make search procedures see a store with no data or indexes.
4785        #[cfg(feature = "lpg")]
4786        if matches!(self.lpg_backend, LpgBackend::Active) {
4787            planner = planner.with_lpg_store(Arc::clone(&self.store));
4788        }
4789
4790        // Attach the constraint validator for schema enforcement and property size limits
4791        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4792            .with_store(store)
4793            .with_max_property_size(self.max_property_size);
4794        planner = planner.with_validator(Arc::new(validator));
4795
4796        planner
4797    }
4798
4799    /// Builds a `Value::Map` for the `info()` introspection function.
4800    fn build_info_value(store: &dyn GraphStore) -> Value {
4801        use grafeo_common::types::PropertyKey;
4802        use std::collections::BTreeMap;
4803
4804        let mut map = BTreeMap::new();
4805        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4806        // reason: node/edge counts will not exceed i64::MAX
4807        #[allow(clippy::cast_possible_wrap)]
4808        let node_count = store.node_count() as i64;
4809        // reason: value is a small counter, well within i64::MAX
4810        #[allow(clippy::cast_possible_wrap)]
4811        let edge_count = store.edge_count() as i64;
4812        map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4813        map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4814        map.insert(
4815            PropertyKey::from("version"),
4816            Value::String(env!("CARGO_PKG_VERSION").into()),
4817        );
4818        Value::Map(map.into())
4819    }
4820
4821    /// Builds a `Value::Map` for the `schema()` introspection function.
4822    fn build_schema_value(store: &dyn GraphStore) -> Value {
4823        use grafeo_common::types::PropertyKey;
4824        use std::collections::BTreeMap;
4825
4826        let labels: Vec<Value> = store
4827            .all_labels()
4828            .into_iter()
4829            .map(|l| Value::String(l.into()))
4830            .collect();
4831        let edge_types: Vec<Value> = store
4832            .all_edge_types()
4833            .into_iter()
4834            .map(|t| Value::String(t.into()))
4835            .collect();
4836        let property_keys: Vec<Value> = store
4837            .all_property_keys()
4838            .into_iter()
4839            .map(|k| Value::String(k.into()))
4840            .collect();
4841
4842        let mut map = BTreeMap::new();
4843        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4844        map.insert(
4845            PropertyKey::from("edge_types"),
4846            Value::List(edge_types.into()),
4847        );
4848        map.insert(
4849            PropertyKey::from("property_keys"),
4850            Value::List(property_keys.into()),
4851        );
4852        Value::Map(map.into())
4853    }
4854
4855    /// Creates a node directly (bypassing query execution).
4856    ///
4857    /// This is a low-level API for testing and direct manipulation.
4858    /// If a transaction is active, the node will be versioned with the transaction ID.
4859    #[cfg(feature = "lpg")]
4860    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4861        let (epoch, transaction_id) = self.get_transaction_context();
4862        let id = self.active_lpg_store().create_node_versioned(
4863            labels,
4864            epoch,
4865            transaction_id.unwrap_or(TransactionId::SYSTEM),
4866        );
4867
4868        #[cfg(feature = "wal")]
4869        self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateNode {
4870            id,
4871            labels: labels.iter().map(|s| (*s).to_string()).collect(),
4872        });
4873
4874        id
4875    }
4876
4877    /// Creates a node with properties.
4878    ///
4879    /// If a transaction is active, the node will be versioned with the transaction ID.
4880    ///
4881    /// # Errors
4882    ///
4883    /// Returns an error if any property value exceeds the configured `max_property_size`.
4884    #[cfg(feature = "lpg")]
4885    pub fn create_node_with_props<'a>(
4886        &self,
4887        labels: &[&str],
4888        properties: impl IntoIterator<Item = (&'a str, Value)>,
4889    ) -> Result<NodeId> {
4890        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4891        for (key, value) in &props {
4892            self.check_property_size(key, value)?;
4893        }
4894
4895        // Snapshot the props for WAL before passing them to the LPG store.
4896        // The LPG store consumes `props` by value; we need owned copies for
4897        // post-hoc WAL emission.
4898        #[cfg(feature = "wal")]
4899        let wal_props: Vec<(String, Value)> = props
4900            .iter()
4901            .map(|(k, v)| ((*k).to_string(), v.clone()))
4902            .collect();
4903
4904        let (epoch, transaction_id) = self.get_transaction_context();
4905        let id = self.active_lpg_store().create_node_with_props_versioned(
4906            labels,
4907            props,
4908            epoch,
4909            transaction_id.unwrap_or(TransactionId::SYSTEM),
4910        );
4911
4912        #[cfg(feature = "wal")]
4913        {
4914            self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateNode {
4915                id,
4916                labels: labels.iter().map(|s| (*s).to_string()).collect(),
4917            });
4918            for (key, value) in wal_props {
4919                self.log_wal_record(&grafeo_storage::wal::WalRecord::SetNodeProperty {
4920                    id,
4921                    key,
4922                    value,
4923                });
4924            }
4925        }
4926
4927        Ok(id)
4928    }
4929
4930    /// Creates an edge between two nodes.
4931    ///
4932    /// This is a low-level API for testing and direct manipulation.
4933    /// If a transaction is active, the edge will be versioned with the transaction ID.
4934    #[cfg(feature = "lpg")]
4935    pub fn create_edge(
4936        &self,
4937        src: NodeId,
4938        dst: NodeId,
4939        edge_type: &str,
4940    ) -> grafeo_common::types::EdgeId {
4941        let (epoch, transaction_id) = self.get_transaction_context();
4942        let eid = self.active_lpg_store().create_edge_versioned(
4943            src,
4944            dst,
4945            edge_type,
4946            epoch,
4947            transaction_id.unwrap_or(TransactionId::SYSTEM),
4948        );
4949
4950        #[cfg(feature = "wal")]
4951        self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateEdge {
4952            id: eid,
4953            src,
4954            dst,
4955            edge_type: edge_type.to_string(),
4956        });
4957
4958        eid
4959    }
4960
4961    /// Creates an edge with properties within the active transaction context.
4962    ///
4963    /// # Errors
4964    ///
4965    /// Returns an error if any property value exceeds the configured `max_property_size`.
4966    #[cfg(feature = "lpg")]
4967    pub fn create_edge_with_props<'a>(
4968        &self,
4969        src: NodeId,
4970        dst: NodeId,
4971        edge_type: &str,
4972        properties: impl IntoIterator<Item = (&'a str, Value)>,
4973    ) -> Result<grafeo_common::types::EdgeId> {
4974        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4975        for (key, value) in &props {
4976            self.check_property_size(key, value)?;
4977        }
4978        let (epoch, transaction_id) = self.get_transaction_context();
4979        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4980        let store = self.active_lpg_store();
4981        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4982        for (key, value) in &props {
4983            store.set_edge_property_versioned(eid, key, value.clone(), tid);
4984        }
4985
4986        #[cfg(feature = "wal")]
4987        {
4988            self.log_wal_record(&grafeo_storage::wal::WalRecord::CreateEdge {
4989                id: eid,
4990                src,
4991                dst,
4992                edge_type: edge_type.to_string(),
4993            });
4994            for (key, value) in props {
4995                self.log_wal_record(&grafeo_storage::wal::WalRecord::SetEdgeProperty {
4996                    id: eid,
4997                    key: key.to_string(),
4998                    value,
4999                });
5000            }
5001        }
5002
5003        Ok(eid)
5004    }
5005
5006    /// Sets a node property within the active transaction context.
5007    ///
5008    /// # Errors
5009    ///
5010    /// Returns an error if the value exceeds the configured `max_property_size`.
5011    #[cfg(feature = "lpg")]
5012    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
5013        self.check_property_size(key, &value)?;
5014        let (_, transaction_id) = self.get_transaction_context();
5015
5016        #[cfg(feature = "wal")]
5017        let value_for_wal = value.clone();
5018
5019        if let Some(tid) = transaction_id {
5020            self.active_lpg_store()
5021                .set_node_property_versioned(id, key, value, tid);
5022        } else {
5023            self.active_lpg_store().set_node_property(id, key, value);
5024        }
5025
5026        #[cfg(feature = "wal")]
5027        self.log_wal_record(&grafeo_storage::wal::WalRecord::SetNodeProperty {
5028            id,
5029            key: key.to_string(),
5030            value: value_for_wal,
5031        });
5032
5033        Ok(())
5034    }
5035
5036    /// Sets an edge property within the active transaction context.
5037    ///
5038    /// # Errors
5039    ///
5040    /// Returns an error if the value exceeds the configured `max_property_size`.
5041    #[cfg(feature = "lpg")]
5042    pub fn set_edge_property(
5043        &self,
5044        id: grafeo_common::types::EdgeId,
5045        key: &str,
5046        value: Value,
5047    ) -> Result<()> {
5048        self.check_property_size(key, &value)?;
5049        let (_, transaction_id) = self.get_transaction_context();
5050
5051        #[cfg(feature = "wal")]
5052        let value_for_wal = value.clone();
5053
5054        if let Some(tid) = transaction_id {
5055            self.active_lpg_store()
5056                .set_edge_property_versioned(id, key, value, tid);
5057        } else {
5058            self.active_lpg_store().set_edge_property(id, key, value);
5059        }
5060
5061        #[cfg(feature = "wal")]
5062        self.log_wal_record(&grafeo_storage::wal::WalRecord::SetEdgeProperty {
5063            id,
5064            key: key.to_string(),
5065            value: value_for_wal,
5066        });
5067
5068        Ok(())
5069    }
5070
5071    /// Deletes a node within the active transaction context.
5072    #[cfg(feature = "lpg")]
5073    pub fn delete_node(&self, id: NodeId) -> bool {
5074        let (epoch, transaction_id) = self.get_transaction_context();
5075        let deleted = if let Some(tid) = transaction_id {
5076            self.active_lpg_store()
5077                .delete_node_versioned(id, epoch, tid)
5078        } else {
5079            self.active_lpg_store().delete_node(id)
5080        };
5081
5082        #[cfg(feature = "wal")]
5083        if deleted {
5084            self.log_wal_record(&grafeo_storage::wal::WalRecord::DeleteNode { id });
5085        }
5086
5087        deleted
5088    }
5089
5090    /// Deletes an edge within the active transaction context.
5091    #[cfg(feature = "lpg")]
5092    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
5093        let (epoch, transaction_id) = self.get_transaction_context();
5094        let deleted = if let Some(tid) = transaction_id {
5095            self.active_lpg_store()
5096                .delete_edge_versioned(id, epoch, tid)
5097        } else {
5098            self.active_lpg_store().delete_edge(id)
5099        };
5100
5101        #[cfg(feature = "wal")]
5102        if deleted {
5103            self.log_wal_record(&grafeo_storage::wal::WalRecord::DeleteEdge { id });
5104        }
5105
5106        deleted
5107    }
5108
5109    // =========================================================================
5110    // Direct Lookup APIs (bypass query planning for O(1) point reads)
5111    // =========================================================================
5112
5113    /// Gets a node by ID directly, bypassing query planning.
5114    ///
5115    /// This is the fastest way to retrieve a single node when you know its ID.
5116    /// Skips parsing, binding, optimization, and physical planning entirely.
5117    ///
5118    /// # Performance
5119    ///
5120    /// - Time complexity: O(1) average case
5121    /// - No lock contention (uses DashMap internally)
5122    /// - ~20-30x faster than equivalent MATCH query
5123    ///
5124    /// # Example
5125    ///
5126    /// ```no_run
5127    /// # use grafeo_engine::GrafeoDB;
5128    /// # let db = GrafeoDB::new_in_memory();
5129    /// let session = db.session();
5130    /// let node_id = session.create_node(&["Person"]);
5131    ///
5132    /// // Direct lookup - O(1), no query planning
5133    /// let node = session.get_node(node_id);
5134    /// assert!(node.is_some());
5135    /// ```
5136    #[cfg(feature = "lpg")]
5137    #[must_use]
5138    pub fn get_node(&self, id: NodeId) -> Option<Node> {
5139        let (epoch, transaction_id) = self.get_transaction_context();
5140        self.active_lpg_store().get_node_versioned(
5141            id,
5142            epoch,
5143            transaction_id.unwrap_or(TransactionId::SYSTEM),
5144        )
5145    }
5146
5147    /// Gets a single property from a node by ID, bypassing query planning.
5148    ///
5149    /// More efficient than `get_node()` when you only need one property,
5150    /// as it avoids loading the full node with all properties.
5151    ///
5152    /// # Performance
5153    ///
5154    /// - Time complexity: O(1) average case
5155    /// - No query planning overhead
5156    ///
5157    /// # Example
5158    ///
5159    /// ```no_run
5160    /// # use grafeo_engine::GrafeoDB;
5161    /// # use grafeo_common::types::Value;
5162    /// # let db = GrafeoDB::new_in_memory();
5163    /// let session = db.session();
5164    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]).unwrap();
5165    ///
5166    /// // Direct property access - O(1)
5167    /// let name = session.get_node_property(id, "name");
5168    /// assert_eq!(name, Some(Value::String("Alix".into())));
5169    /// ```
5170    #[cfg(feature = "lpg")]
5171    #[must_use]
5172    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
5173        self.get_node(id)
5174            .and_then(|node| node.get_property(key).cloned())
5175    }
5176
5177    /// Gets an edge by ID directly, bypassing query planning.
5178    ///
5179    /// # Performance
5180    ///
5181    /// - Time complexity: O(1) average case
5182    /// - No lock contention
5183    #[cfg(feature = "lpg")]
5184    #[must_use]
5185    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
5186        let (epoch, transaction_id) = self.get_transaction_context();
5187        self.active_lpg_store().get_edge_versioned(
5188            id,
5189            epoch,
5190            transaction_id.unwrap_or(TransactionId::SYSTEM),
5191        )
5192    }
5193
5194    /// Gets outgoing neighbors of a node directly, bypassing query planning.
5195    ///
5196    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
5197    ///
5198    /// # Performance
5199    ///
5200    /// - Time complexity: O(degree) where degree is the number of outgoing edges
5201    /// - Uses adjacency index for direct access
5202    /// - ~10-20x faster than equivalent MATCH query
5203    ///
5204    /// # Example
5205    ///
5206    /// ```no_run
5207    /// # use grafeo_engine::GrafeoDB;
5208    /// # let db = GrafeoDB::new_in_memory();
5209    /// let session = db.session();
5210    /// let alix = session.create_node(&["Person"]);
5211    /// let gus = session.create_node(&["Person"]);
5212    /// session.create_edge(alix, gus, "KNOWS");
5213    ///
5214    /// // Direct neighbor lookup - O(degree)
5215    /// let neighbors = session.get_neighbors_outgoing(alix);
5216    /// assert_eq!(neighbors.len(), 1);
5217    /// assert_eq!(neighbors[0].0, gus);
5218    /// ```
5219    #[cfg(feature = "lpg")]
5220    #[must_use]
5221    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5222        self.active_lpg_store()
5223            .edges_from(node, Direction::Outgoing)
5224            .collect()
5225    }
5226
5227    /// Gets incoming neighbors of a node directly, bypassing query planning.
5228    ///
5229    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
5230    ///
5231    /// # Performance
5232    ///
5233    /// - Time complexity: O(degree) where degree is the number of incoming edges
5234    /// - Uses backward adjacency index for direct access
5235    #[cfg(feature = "lpg")]
5236    #[must_use]
5237    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
5238        self.active_lpg_store()
5239            .edges_from(node, Direction::Incoming)
5240            .collect()
5241    }
5242
5243    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
5244    ///
5245    /// # Example
5246    ///
5247    /// ```no_run
5248    /// # use grafeo_engine::GrafeoDB;
5249    /// # let db = GrafeoDB::new_in_memory();
5250    /// # let session = db.session();
5251    /// # let alix = session.create_node(&["Person"]);
5252    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5253    /// ```
5254    #[cfg(feature = "lpg")]
5255    #[must_use]
5256    pub fn get_neighbors_outgoing_by_type(
5257        &self,
5258        node: NodeId,
5259        edge_type: &str,
5260    ) -> Vec<(NodeId, EdgeId)> {
5261        self.active_lpg_store()
5262            .edges_from(node, Direction::Outgoing)
5263            .filter(|(_, edge_id)| {
5264                self.get_edge(*edge_id)
5265                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
5266            })
5267            .collect()
5268    }
5269
5270    /// Checks if a node exists, bypassing query planning.
5271    ///
5272    /// # Performance
5273    ///
5274    /// - Time complexity: O(1)
5275    /// - Fastest existence check available
5276    #[cfg(feature = "lpg")]
5277    #[must_use]
5278    pub fn node_exists(&self, id: NodeId) -> bool {
5279        self.get_node(id).is_some()
5280    }
5281
5282    /// Checks if an edge exists, bypassing query planning.
5283    #[cfg(feature = "lpg")]
5284    #[must_use]
5285    pub fn edge_exists(&self, id: EdgeId) -> bool {
5286        self.get_edge(id).is_some()
5287    }
5288
5289    /// Gets the degree (number of edges) of a node.
5290    ///
5291    /// Returns (outgoing_degree, incoming_degree).
5292    #[cfg(feature = "lpg")]
5293    #[must_use]
5294    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
5295        let active = self.active_lpg_store();
5296        let out = active.out_degree(node);
5297        let in_degree = active.in_degree(node);
5298        (out, in_degree)
5299    }
5300
5301    /// Batch lookup of multiple nodes by ID.
5302    ///
5303    /// More efficient than calling `get_node()` in a loop because it
5304    /// amortizes overhead.
5305    ///
5306    /// # Performance
5307    ///
5308    /// - Time complexity: O(n) where n is the number of IDs
5309    /// - Better cache utilization than individual lookups
5310    #[cfg(feature = "lpg")]
5311    #[must_use]
5312    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
5313        let (epoch, transaction_id) = self.get_transaction_context();
5314        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
5315        let active = self.active_lpg_store();
5316        ids.iter()
5317            .map(|&id| active.get_node_versioned(id, epoch, tx))
5318            .collect()
5319    }
5320
5321    // ── Change Data Capture ─────────────────────────────────────────────
5322
5323    /// Returns the full change history for an entity (node or edge).
5324    ///
5325    /// # Errors
5326    ///
5327    /// Returns an authorization error if the session lacks read permission.
5328    #[cfg(feature = "cdc")]
5329    pub fn history(
5330        &self,
5331        entity_id: impl Into<crate::cdc::EntityId>,
5332    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5333        self.require_permission(crate::auth::StatementKind::Read)?;
5334        Ok(self.cdc_log.history(entity_id.into()))
5335    }
5336
5337    /// Returns change events for an entity since the given epoch.
5338    ///
5339    /// # Errors
5340    ///
5341    /// Returns an authorization error if the session lacks read permission.
5342    #[cfg(feature = "cdc")]
5343    pub fn history_since(
5344        &self,
5345        entity_id: impl Into<crate::cdc::EntityId>,
5346        since_epoch: EpochId,
5347    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5348        self.require_permission(crate::auth::StatementKind::Read)?;
5349        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
5350    }
5351
5352    /// Returns all change events across all entities in an epoch range.
5353    ///
5354    /// # Errors
5355    ///
5356    /// Returns an authorization error if the session lacks read permission.
5357    #[cfg(feature = "cdc")]
5358    pub fn changes_between(
5359        &self,
5360        start_epoch: EpochId,
5361        end_epoch: EpochId,
5362    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
5363        self.require_permission(crate::auth::StatementKind::Read)?;
5364        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
5365    }
5366}
5367
5368impl Drop for Session {
5369    fn drop(&mut self) {
5370        // Auto-rollback any active transaction to prevent leaked MVCC state,
5371        // dangling write locks, and uncommitted versions lingering in the store.
5372        #[cfg(feature = "lpg")]
5373        if self.in_transaction() {
5374            let _ = self.rollback_inner();
5375        }
5376
5377        #[cfg(feature = "metrics")]
5378        if let Some(ref reg) = self.metrics {
5379            reg.session_active
5380                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
5381        }
5382    }
5383}
5384
5385#[cfg(test)]
5386mod tests {
5387    use super::parse_default_literal;
5388    use crate::database::GrafeoDB;
5389    use grafeo_common::types::Value;
5390
5391    // -----------------------------------------------------------------------
5392    // parse_default_literal
5393    // -----------------------------------------------------------------------
5394
5395    #[test]
5396    fn parse_default_literal_null() {
5397        assert_eq!(parse_default_literal("null"), Value::Null);
5398        assert_eq!(parse_default_literal("NULL"), Value::Null);
5399        assert_eq!(parse_default_literal("Null"), Value::Null);
5400    }
5401
5402    #[test]
5403    fn parse_default_literal_bool() {
5404        assert_eq!(parse_default_literal("true"), Value::Bool(true));
5405        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
5406        assert_eq!(parse_default_literal("false"), Value::Bool(false));
5407        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
5408    }
5409
5410    #[test]
5411    fn parse_default_literal_string_single_quoted() {
5412        assert_eq!(
5413            parse_default_literal("'hello'"),
5414            Value::String("hello".into())
5415        );
5416    }
5417
5418    #[test]
5419    fn parse_default_literal_string_double_quoted() {
5420        assert_eq!(
5421            parse_default_literal("\"world\""),
5422            Value::String("world".into())
5423        );
5424    }
5425
5426    #[test]
5427    fn parse_default_literal_integer() {
5428        assert_eq!(parse_default_literal("42"), Value::Int64(42));
5429        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5430        assert_eq!(parse_default_literal("0"), Value::Int64(0));
5431    }
5432
5433    #[test]
5434    fn parse_default_literal_float() {
5435        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5436        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5437    }
5438
5439    #[test]
5440    fn parse_default_literal_fallback_string() {
5441        // Not a recognized literal, not quoted, not a number
5442        assert_eq!(
5443            parse_default_literal("some_identifier"),
5444            Value::String("some_identifier".into())
5445        );
5446    }
5447
5448    #[test]
5449    fn test_session_create_node() {
5450        let db = GrafeoDB::new_in_memory();
5451        let session = db.session();
5452
5453        let id = session.create_node(&["Person"]);
5454        assert!(id.is_valid());
5455        assert_eq!(db.node_count(), 1);
5456    }
5457
5458    #[test]
5459    fn test_session_transaction() {
5460        let db = GrafeoDB::new_in_memory();
5461        let mut session = db.session();
5462
5463        assert!(!session.in_transaction());
5464
5465        session.begin_transaction().unwrap();
5466        assert!(session.in_transaction());
5467
5468        session.commit().unwrap();
5469        assert!(!session.in_transaction());
5470    }
5471
5472    #[test]
5473    fn test_session_transaction_context() {
5474        let db = GrafeoDB::new_in_memory();
5475        let mut session = db.session();
5476
5477        // Without transaction - context should have current epoch and no transaction_id
5478        let (_epoch1, transaction_id1) = session.get_transaction_context();
5479        assert!(transaction_id1.is_none());
5480
5481        // Start a transaction
5482        session.begin_transaction().unwrap();
5483        let (epoch2, transaction_id2) = session.get_transaction_context();
5484        assert!(transaction_id2.is_some());
5485        // Transaction should have a valid epoch
5486        let _ = epoch2; // Use the variable
5487
5488        // Commit and verify
5489        session.commit().unwrap();
5490        let (epoch3, tx_id3) = session.get_transaction_context();
5491        assert!(tx_id3.is_none());
5492        // Epoch should have advanced after commit
5493        assert!(epoch3.as_u64() >= epoch2.as_u64());
5494    }
5495
5496    #[test]
5497    fn test_session_rollback() {
5498        let db = GrafeoDB::new_in_memory();
5499        let mut session = db.session();
5500
5501        session.begin_transaction().unwrap();
5502        session.rollback().unwrap();
5503        assert!(!session.in_transaction());
5504    }
5505
5506    #[test]
5507    fn test_session_rollback_discards_versions() {
5508        use grafeo_common::types::TransactionId;
5509
5510        let db = GrafeoDB::new_in_memory();
5511
5512        // Create a node outside of any transaction (at system level)
5513        let node_before = db.store().create_node(&["Person"]);
5514        assert!(node_before.is_valid());
5515        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5516
5517        // Start a transaction
5518        let mut session = db.session();
5519        session.begin_transaction().unwrap();
5520        let transaction_id = session.current_transaction.lock().unwrap();
5521
5522        // Create a node versioned with the transaction's ID
5523        let epoch = db.store().current_epoch();
5524        let node_in_tx = db
5525            .store()
5526            .create_node_versioned(&["Person"], epoch, transaction_id);
5527        assert!(node_in_tx.is_valid());
5528
5529        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5530        // non-versioned lookups like node_count(). Verify the node is visible
5531        // only through the owning transaction.
5532        assert_eq!(
5533            db.node_count(),
5534            1,
5535            "PENDING nodes should be invisible to non-versioned node_count()"
5536        );
5537        assert!(
5538            db.store()
5539                .get_node_versioned(node_in_tx, epoch, transaction_id)
5540                .is_some(),
5541            "Transaction node should be visible to its own transaction"
5542        );
5543
5544        // Rollback the transaction
5545        session.rollback().unwrap();
5546        assert!(!session.in_transaction());
5547
5548        // The node created in the transaction should be discarded
5549        // Only the first node should remain visible
5550        let count_after = db.node_count();
5551        assert_eq!(
5552            count_after, 1,
5553            "Rollback should discard uncommitted node, but got {count_after}"
5554        );
5555
5556        // The original node should still be accessible
5557        let current_epoch = db.store().current_epoch();
5558        assert!(
5559            db.store()
5560                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5561                .is_some(),
5562            "Original node should still exist"
5563        );
5564
5565        // The node created in the transaction should not be accessible
5566        assert!(
5567            db.store()
5568                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5569                .is_none(),
5570            "Transaction node should be gone"
5571        );
5572    }
5573
5574    #[test]
5575    fn test_session_create_node_in_transaction() {
5576        // Test that session.create_node() is transaction-aware
5577        let db = GrafeoDB::new_in_memory();
5578
5579        // Create a node outside of any transaction
5580        let node_before = db.create_node(&["Person"]);
5581        assert!(node_before.is_valid());
5582        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5583
5584        // Start a transaction and create a node through the session
5585        let mut session = db.session();
5586        session.begin_transaction().unwrap();
5587        let transaction_id = session.current_transaction.lock().unwrap();
5588
5589        // Create a node through session.create_node() - should be versioned with tx
5590        let node_in_tx = session.create_node(&["Person"]);
5591        assert!(node_in_tx.is_valid());
5592
5593        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5594        // non-versioned lookups. Verify the node is visible only to its own tx.
5595        assert_eq!(
5596            db.node_count(),
5597            1,
5598            "PENDING nodes should be invisible to non-versioned node_count()"
5599        );
5600        let epoch = db.store().current_epoch();
5601        assert!(
5602            db.store()
5603                .get_node_versioned(node_in_tx, epoch, transaction_id)
5604                .is_some(),
5605            "Transaction node should be visible to its own transaction"
5606        );
5607
5608        // Rollback the transaction
5609        session.rollback().unwrap();
5610
5611        // The node created via session.create_node() should be discarded
5612        let count_after = db.node_count();
5613        assert_eq!(
5614            count_after, 1,
5615            "Rollback should discard node created via session.create_node(), but got {count_after}"
5616        );
5617    }
5618
5619    #[test]
5620    fn test_session_create_node_with_props_in_transaction() {
5621        use grafeo_common::types::Value;
5622
5623        // Test that session.create_node_with_props() is transaction-aware
5624        let db = GrafeoDB::new_in_memory();
5625
5626        // Create a node outside of any transaction
5627        db.create_node(&["Person"]);
5628        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5629
5630        // Start a transaction and create a node with properties
5631        let mut session = db.session();
5632        session.begin_transaction().unwrap();
5633        let transaction_id = session.current_transaction.lock().unwrap();
5634
5635        let node_in_tx = session
5636            .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5637            .unwrap();
5638        assert!(node_in_tx.is_valid());
5639
5640        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5641        // non-versioned lookups. Verify the node is visible only to its own tx.
5642        assert_eq!(
5643            db.node_count(),
5644            1,
5645            "PENDING nodes should be invisible to non-versioned node_count()"
5646        );
5647        let epoch = db.store().current_epoch();
5648        assert!(
5649            db.store()
5650                .get_node_versioned(node_in_tx, epoch, transaction_id)
5651                .is_some(),
5652            "Transaction node should be visible to its own transaction"
5653        );
5654
5655        // Rollback the transaction
5656        session.rollback().unwrap();
5657
5658        // The node should be discarded
5659        let count_after = db.node_count();
5660        assert_eq!(
5661            count_after, 1,
5662            "Rollback should discard node created via session.create_node_with_props()"
5663        );
5664    }
5665
5666    #[cfg(feature = "gql")]
5667    mod gql_tests {
5668        use super::*;
5669
5670        #[test]
5671        fn test_gql_query_execution() {
5672            let db = GrafeoDB::new_in_memory();
5673            let session = db.session();
5674
5675            // Create some test data
5676            session.create_node(&["Person"]);
5677            session.create_node(&["Person"]);
5678            session.create_node(&["Animal"]);
5679
5680            // Execute a GQL query
5681            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5682
5683            // Should return 2 Person nodes
5684            assert_eq!(result.row_count(), 2);
5685            assert_eq!(result.column_count(), 1);
5686            assert_eq!(result.columns[0], "n");
5687        }
5688
5689        #[test]
5690        fn test_gql_empty_result() {
5691            let db = GrafeoDB::new_in_memory();
5692            let session = db.session();
5693
5694            // No data in database
5695            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5696
5697            assert_eq!(result.row_count(), 0);
5698        }
5699
5700        #[test]
5701        fn test_gql_parse_error() {
5702            let db = GrafeoDB::new_in_memory();
5703            let session = db.session();
5704
5705            // Invalid GQL syntax
5706            let result = session.execute("MATCH (n RETURN n");
5707
5708            assert!(result.is_err());
5709        }
5710
5711        #[test]
5712        fn test_gql_relationship_traversal() {
5713            let db = GrafeoDB::new_in_memory();
5714            let session = db.session();
5715
5716            // Create a graph: Alix -> Gus, Alix -> Vincent
5717            let alix = session.create_node(&["Person"]);
5718            let gus = session.create_node(&["Person"]);
5719            let vincent = session.create_node(&["Person"]);
5720
5721            session.create_edge(alix, gus, "KNOWS");
5722            session.create_edge(alix, vincent, "KNOWS");
5723
5724            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5725            let result = session
5726                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5727                .unwrap();
5728
5729            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5730            assert_eq!(result.row_count(), 2);
5731            assert_eq!(result.column_count(), 2);
5732            assert_eq!(result.columns[0], "a");
5733            assert_eq!(result.columns[1], "b");
5734        }
5735
5736        #[test]
5737        fn test_gql_relationship_with_type_filter() {
5738            let db = GrafeoDB::new_in_memory();
5739            let session = db.session();
5740
5741            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5742            let alix = session.create_node(&["Person"]);
5743            let gus = session.create_node(&["Person"]);
5744            let vincent = session.create_node(&["Person"]);
5745
5746            session.create_edge(alix, gus, "KNOWS");
5747            session.create_edge(alix, vincent, "WORKS_WITH");
5748
5749            // Query only KNOWS relationships
5750            let result = session
5751                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5752                .unwrap();
5753
5754            // Should return only 1 row (Alix->Gus)
5755            assert_eq!(result.row_count(), 1);
5756        }
5757
5758        #[test]
5759        fn test_gql_semantic_error_undefined_variable() {
5760            let db = GrafeoDB::new_in_memory();
5761            let session = db.session();
5762
5763            // Reference undefined variable 'x' in RETURN
5764            let result = session.execute("MATCH (n:Person) RETURN x");
5765
5766            // Should fail with semantic error
5767            assert!(result.is_err());
5768            let Err(err) = result else {
5769                panic!("Expected error")
5770            };
5771            assert!(
5772                err.to_string().contains("Undefined variable"),
5773                "Expected undefined variable error, got: {}",
5774                err
5775            );
5776        }
5777
5778        #[test]
5779        fn test_gql_where_clause_property_filter() {
5780            use grafeo_common::types::Value;
5781
5782            let db = GrafeoDB::new_in_memory();
5783            let session = db.session();
5784
5785            // Create people with ages
5786            session
5787                .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5788                .unwrap();
5789            session
5790                .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5791                .unwrap();
5792            session
5793                .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5794                .unwrap();
5795
5796            // Query with WHERE clause: age > 30
5797            let result = session
5798                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5799                .unwrap();
5800
5801            // Should return 2 people (ages 35 and 45)
5802            assert_eq!(result.row_count(), 2);
5803        }
5804
5805        #[test]
5806        fn test_gql_where_clause_equality() {
5807            use grafeo_common::types::Value;
5808
5809            let db = GrafeoDB::new_in_memory();
5810            let session = db.session();
5811
5812            // Create people with names
5813            session
5814                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5815                .unwrap();
5816            session
5817                .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5818                .unwrap();
5819            session
5820                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5821                .unwrap();
5822
5823            // Query with WHERE clause: name = "Alix"
5824            let result = session
5825                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5826                .unwrap();
5827
5828            // Should return 2 people named Alix
5829            assert_eq!(result.row_count(), 2);
5830        }
5831
5832        #[test]
5833        fn test_gql_return_property_access() {
5834            use grafeo_common::types::Value;
5835
5836            let db = GrafeoDB::new_in_memory();
5837            let session = db.session();
5838
5839            // Create people with names and ages
5840            session
5841                .create_node_with_props(
5842                    &["Person"],
5843                    [
5844                        ("name", Value::String("Alix".into())),
5845                        ("age", Value::Int64(30)),
5846                    ],
5847                )
5848                .unwrap();
5849            session
5850                .create_node_with_props(
5851                    &["Person"],
5852                    [
5853                        ("name", Value::String("Gus".into())),
5854                        ("age", Value::Int64(25)),
5855                    ],
5856                )
5857                .unwrap();
5858
5859            // Query returning properties
5860            let result = session
5861                .execute("MATCH (n:Person) RETURN n.name, n.age")
5862                .unwrap();
5863
5864            // Should return 2 rows with name and age columns
5865            assert_eq!(result.row_count(), 2);
5866            assert_eq!(result.column_count(), 2);
5867            assert_eq!(result.columns[0], "n.name");
5868            assert_eq!(result.columns[1], "n.age");
5869
5870            // Check that we get actual values
5871            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5872            assert!(names.contains(&&Value::String("Alix".into())));
5873            assert!(names.contains(&&Value::String("Gus".into())));
5874        }
5875
5876        #[test]
5877        fn test_gql_return_mixed_expressions() {
5878            use grafeo_common::types::Value;
5879
5880            let db = GrafeoDB::new_in_memory();
5881            let session = db.session();
5882
5883            // Create a person
5884            session
5885                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5886                .unwrap();
5887
5888            // Query returning both node and property
5889            let result = session
5890                .execute("MATCH (n:Person) RETURN n, n.name")
5891                .unwrap();
5892
5893            assert_eq!(result.row_count(), 1);
5894            assert_eq!(result.column_count(), 2);
5895            assert_eq!(result.columns[0], "n");
5896            assert_eq!(result.columns[1], "n.name");
5897
5898            // Second column should be the name
5899            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5900        }
5901    }
5902
5903    #[cfg(feature = "cypher")]
5904    mod cypher_tests {
5905        use super::*;
5906
5907        #[test]
5908        fn test_cypher_query_execution() {
5909            let db = GrafeoDB::new_in_memory();
5910            let session = db.session();
5911
5912            // Create some test data
5913            session.create_node(&["Person"]);
5914            session.create_node(&["Person"]);
5915            session.create_node(&["Animal"]);
5916
5917            // Execute a Cypher query
5918            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5919
5920            // Should return 2 Person nodes
5921            assert_eq!(result.row_count(), 2);
5922            assert_eq!(result.column_count(), 1);
5923            assert_eq!(result.columns[0], "n");
5924        }
5925
5926        #[test]
5927        fn test_cypher_empty_result() {
5928            let db = GrafeoDB::new_in_memory();
5929            let session = db.session();
5930
5931            // No data in database
5932            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5933
5934            assert_eq!(result.row_count(), 0);
5935        }
5936
5937        #[test]
5938        fn test_cypher_parse_error() {
5939            let db = GrafeoDB::new_in_memory();
5940            let session = db.session();
5941
5942            // Invalid Cypher syntax
5943            let result = session.execute_cypher("MATCH (n RETURN n");
5944
5945            assert!(result.is_err());
5946        }
5947    }
5948
5949    // ==================== Direct Lookup API Tests ====================
5950
5951    mod direct_lookup_tests {
5952        use super::*;
5953        use grafeo_common::types::Value;
5954
5955        #[test]
5956        fn test_get_node() {
5957            let db = GrafeoDB::new_in_memory();
5958            let session = db.session();
5959
5960            let id = session.create_node(&["Person"]);
5961            let node = session.get_node(id);
5962
5963            assert!(node.is_some());
5964            let node = node.unwrap();
5965            assert_eq!(node.id, id);
5966        }
5967
5968        #[test]
5969        fn test_get_node_not_found() {
5970            use grafeo_common::types::NodeId;
5971
5972            let db = GrafeoDB::new_in_memory();
5973            let session = db.session();
5974
5975            // Try to get a non-existent node
5976            let node = session.get_node(NodeId::new(9999));
5977            assert!(node.is_none());
5978        }
5979
5980        #[test]
5981        fn test_get_node_property() {
5982            let db = GrafeoDB::new_in_memory();
5983            let session = db.session();
5984
5985            let id = session
5986                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5987                .unwrap();
5988
5989            let name = session.get_node_property(id, "name");
5990            assert_eq!(name, Some(Value::String("Alix".into())));
5991
5992            // Non-existent property
5993            let missing = session.get_node_property(id, "missing");
5994            assert!(missing.is_none());
5995        }
5996
5997        #[test]
5998        fn test_get_edge() {
5999            let db = GrafeoDB::new_in_memory();
6000            let session = db.session();
6001
6002            let alix = session.create_node(&["Person"]);
6003            let gus = session.create_node(&["Person"]);
6004            let edge_id = session.create_edge(alix, gus, "KNOWS");
6005
6006            let edge = session.get_edge(edge_id);
6007            assert!(edge.is_some());
6008            let edge = edge.unwrap();
6009            assert_eq!(edge.id, edge_id);
6010            assert_eq!(edge.src, alix);
6011            assert_eq!(edge.dst, gus);
6012        }
6013
6014        #[test]
6015        fn test_get_edge_not_found() {
6016            use grafeo_common::types::EdgeId;
6017
6018            let db = GrafeoDB::new_in_memory();
6019            let session = db.session();
6020
6021            let edge = session.get_edge(EdgeId::new(9999));
6022            assert!(edge.is_none());
6023        }
6024
6025        #[test]
6026        fn test_get_neighbors_outgoing() {
6027            let db = GrafeoDB::new_in_memory();
6028            let session = db.session();
6029
6030            let alix = session.create_node(&["Person"]);
6031            let gus = session.create_node(&["Person"]);
6032            let harm = session.create_node(&["Person"]);
6033
6034            session.create_edge(alix, gus, "KNOWS");
6035            session.create_edge(alix, harm, "KNOWS");
6036
6037            let neighbors = session.get_neighbors_outgoing(alix);
6038            assert_eq!(neighbors.len(), 2);
6039
6040            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
6041            assert!(neighbor_ids.contains(&gus));
6042            assert!(neighbor_ids.contains(&harm));
6043        }
6044
6045        #[test]
6046        fn test_get_neighbors_incoming() {
6047            let db = GrafeoDB::new_in_memory();
6048            let session = db.session();
6049
6050            let alix = session.create_node(&["Person"]);
6051            let gus = session.create_node(&["Person"]);
6052            let harm = session.create_node(&["Person"]);
6053
6054            session.create_edge(gus, alix, "KNOWS");
6055            session.create_edge(harm, alix, "KNOWS");
6056
6057            let neighbors = session.get_neighbors_incoming(alix);
6058            assert_eq!(neighbors.len(), 2);
6059
6060            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
6061            assert!(neighbor_ids.contains(&gus));
6062            assert!(neighbor_ids.contains(&harm));
6063        }
6064
6065        #[test]
6066        fn test_get_neighbors_outgoing_by_type() {
6067            let db = GrafeoDB::new_in_memory();
6068            let session = db.session();
6069
6070            let alix = session.create_node(&["Person"]);
6071            let gus = session.create_node(&["Person"]);
6072            let company = session.create_node(&["Company"]);
6073
6074            session.create_edge(alix, gus, "KNOWS");
6075            session.create_edge(alix, company, "WORKS_AT");
6076
6077            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
6078            assert_eq!(knows_neighbors.len(), 1);
6079            assert_eq!(knows_neighbors[0].0, gus);
6080
6081            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
6082            assert_eq!(works_neighbors.len(), 1);
6083            assert_eq!(works_neighbors[0].0, company);
6084
6085            // No edges of this type
6086            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
6087            assert!(no_neighbors.is_empty());
6088        }
6089
6090        #[test]
6091        fn test_node_exists() {
6092            use grafeo_common::types::NodeId;
6093
6094            let db = GrafeoDB::new_in_memory();
6095            let session = db.session();
6096
6097            let id = session.create_node(&["Person"]);
6098
6099            assert!(session.node_exists(id));
6100            assert!(!session.node_exists(NodeId::new(9999)));
6101        }
6102
6103        #[test]
6104        fn test_edge_exists() {
6105            use grafeo_common::types::EdgeId;
6106
6107            let db = GrafeoDB::new_in_memory();
6108            let session = db.session();
6109
6110            let alix = session.create_node(&["Person"]);
6111            let gus = session.create_node(&["Person"]);
6112            let edge_id = session.create_edge(alix, gus, "KNOWS");
6113
6114            assert!(session.edge_exists(edge_id));
6115            assert!(!session.edge_exists(EdgeId::new(9999)));
6116        }
6117
6118        #[test]
6119        fn test_get_degree() {
6120            let db = GrafeoDB::new_in_memory();
6121            let session = db.session();
6122
6123            let alix = session.create_node(&["Person"]);
6124            let gus = session.create_node(&["Person"]);
6125            let harm = session.create_node(&["Person"]);
6126
6127            // Alix knows Gus and Harm (2 outgoing)
6128            session.create_edge(alix, gus, "KNOWS");
6129            session.create_edge(alix, harm, "KNOWS");
6130            // Gus knows Alix (1 incoming for Alix)
6131            session.create_edge(gus, alix, "KNOWS");
6132
6133            let (out_degree, in_degree) = session.get_degree(alix);
6134            assert_eq!(out_degree, 2);
6135            assert_eq!(in_degree, 1);
6136
6137            // Node with no edges
6138            let lonely = session.create_node(&["Person"]);
6139            let (out, in_deg) = session.get_degree(lonely);
6140            assert_eq!(out, 0);
6141            assert_eq!(in_deg, 0);
6142        }
6143
6144        #[test]
6145        fn test_get_nodes_batch() {
6146            let db = GrafeoDB::new_in_memory();
6147            let session = db.session();
6148
6149            let alix = session.create_node(&["Person"]);
6150            let gus = session.create_node(&["Person"]);
6151            let harm = session.create_node(&["Person"]);
6152
6153            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
6154            assert_eq!(nodes.len(), 3);
6155            assert!(nodes[0].is_some());
6156            assert!(nodes[1].is_some());
6157            assert!(nodes[2].is_some());
6158
6159            // With non-existent node
6160            use grafeo_common::types::NodeId;
6161            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
6162            assert_eq!(nodes_with_missing.len(), 3);
6163            assert!(nodes_with_missing[0].is_some());
6164            assert!(nodes_with_missing[1].is_none()); // Missing node
6165            assert!(nodes_with_missing[2].is_some());
6166        }
6167
6168        #[test]
6169        fn test_auto_commit_setting() {
6170            let db = GrafeoDB::new_in_memory();
6171            let mut session = db.session();
6172
6173            // Default is auto-commit enabled
6174            assert!(session.auto_commit());
6175
6176            session.set_auto_commit(false);
6177            assert!(!session.auto_commit());
6178
6179            session.set_auto_commit(true);
6180            assert!(session.auto_commit());
6181        }
6182
6183        #[test]
6184        fn test_transaction_double_begin_nests() {
6185            let db = GrafeoDB::new_in_memory();
6186            let mut session = db.session();
6187
6188            session.begin_transaction().unwrap();
6189            // Second begin_transaction creates a nested transaction (auto-savepoint)
6190            let result = session.begin_transaction();
6191            assert!(result.is_ok());
6192            // Commit the inner (releases savepoint)
6193            session.commit().unwrap();
6194            // Commit the outer
6195            session.commit().unwrap();
6196        }
6197
6198        #[test]
6199        fn test_commit_without_transaction_error() {
6200            let db = GrafeoDB::new_in_memory();
6201            let mut session = db.session();
6202
6203            let result = session.commit();
6204            assert!(result.is_err());
6205        }
6206
6207        #[test]
6208        fn test_rollback_without_transaction_error() {
6209            let db = GrafeoDB::new_in_memory();
6210            let mut session = db.session();
6211
6212            let result = session.rollback();
6213            assert!(result.is_err());
6214        }
6215
6216        #[test]
6217        fn test_create_edge_in_transaction() {
6218            let db = GrafeoDB::new_in_memory();
6219            let mut session = db.session();
6220
6221            // Create nodes outside transaction
6222            let alix = session.create_node(&["Person"]);
6223            let gus = session.create_node(&["Person"]);
6224
6225            // Create edge in transaction
6226            session.begin_transaction().unwrap();
6227            let edge_id = session.create_edge(alix, gus, "KNOWS");
6228
6229            // Edge should be visible in the transaction
6230            assert!(session.edge_exists(edge_id));
6231
6232            // Commit
6233            session.commit().unwrap();
6234
6235            // Edge should still be visible
6236            assert!(session.edge_exists(edge_id));
6237        }
6238
6239        #[test]
6240        fn test_neighbors_empty_node() {
6241            let db = GrafeoDB::new_in_memory();
6242            let session = db.session();
6243
6244            let lonely = session.create_node(&["Person"]);
6245
6246            assert!(session.get_neighbors_outgoing(lonely).is_empty());
6247            assert!(session.get_neighbors_incoming(lonely).is_empty());
6248            assert!(
6249                session
6250                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
6251                    .is_empty()
6252            );
6253        }
6254    }
6255
6256    #[test]
6257    fn test_auto_gc_triggers_on_commit_interval() {
6258        use crate::config::Config;
6259
6260        let config = Config::in_memory().with_gc_interval(2);
6261        let db = GrafeoDB::with_config(config).unwrap();
6262        let mut session = db.session();
6263
6264        // First commit: counter = 1, no GC (not a multiple of 2)
6265        session.begin_transaction().unwrap();
6266        session.create_node(&["A"]);
6267        session.commit().unwrap();
6268
6269        // Second commit: counter = 2, GC should trigger (multiple of 2)
6270        session.begin_transaction().unwrap();
6271        session.create_node(&["B"]);
6272        session.commit().unwrap();
6273
6274        // Verify the database is still functional after GC
6275        assert_eq!(db.node_count(), 2);
6276    }
6277
6278    #[test]
6279    fn test_query_timeout_config_propagates_to_session() {
6280        use crate::config::Config;
6281        use std::time::Duration;
6282
6283        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
6284        let db = GrafeoDB::with_config(config).unwrap();
6285        let session = db.session();
6286
6287        // Verify the session has a query deadline (timeout was set)
6288        assert!(session.query_deadline().is_some());
6289    }
6290
6291    #[test]
6292    fn test_default_query_timeout_returns_deadline() {
6293        let db = GrafeoDB::new_in_memory();
6294        let session = db.session();
6295
6296        // Default config has 30s timeout
6297        assert!(session.query_deadline().is_some());
6298    }
6299
6300    #[test]
6301    fn test_no_query_timeout_returns_no_deadline() {
6302        use crate::config::Config;
6303
6304        let config = Config::in_memory().without_query_timeout();
6305        let db = GrafeoDB::with_config(config).unwrap();
6306        let session = db.session();
6307
6308        assert!(session.query_deadline().is_none());
6309    }
6310
6311    #[test]
6312    fn test_graph_model_accessor() {
6313        use crate::config::GraphModel;
6314
6315        let db = GrafeoDB::new_in_memory();
6316        let session = db.session();
6317
6318        assert_eq!(session.graph_model(), GraphModel::Lpg);
6319    }
6320
6321    #[test]
6322    fn test_reject_oversized_property() {
6323        use crate::config::Config;
6324
6325        let config = Config::in_memory().with_max_property_size(100);
6326        let db = GrafeoDB::with_config(config).unwrap();
6327        let session = db.session();
6328
6329        let node = session.create_node(&["Test"]);
6330
6331        // Small property should succeed
6332        session
6333            .set_node_property(node, "small", Value::from("hello"))
6334            .unwrap();
6335
6336        // Large property should be rejected
6337        let big = "x".repeat(200);
6338        let result = session.set_node_property(node, "big", Value::from(big.as_str()));
6339        assert!(result.is_err());
6340        let err = result.unwrap_err().to_string();
6341        assert!(
6342            err.contains("exceeds maximum size"),
6343            "Expected size error, got: {err}"
6344        );
6345    }
6346
6347    #[test]
6348    fn test_no_property_size_limit() {
6349        use crate::config::Config;
6350
6351        let config = Config::in_memory().without_max_property_size();
6352        let db = GrafeoDB::with_config(config).unwrap();
6353        let session = db.session();
6354
6355        let node = session.create_node(&["Test"]);
6356
6357        // Even large properties should succeed with no limit
6358        let big = "x".repeat(10_000);
6359        session
6360            .set_node_property(node, "big", Value::from(big.as_str()))
6361            .unwrap();
6362    }
6363
6364    #[cfg(feature = "gql")]
6365    #[test]
6366    fn test_external_store_session() {
6367        use grafeo_core::graph::GraphStoreMut;
6368        use std::sync::Arc;
6369
6370        let config = crate::config::Config::in_memory();
6371        let store =
6372            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
6373        let db = GrafeoDB::with_store(store, config).unwrap();
6374
6375        let mut session = db.session();
6376
6377        // Use an explicit transaction so that INSERT and MATCH share the same
6378        // transaction context. With PENDING epochs, uncommitted versions are
6379        // only visible to the owning transaction.
6380        session.begin_transaction().unwrap();
6381        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
6382
6383        // Verify we can query through it within the same transaction
6384        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
6385        assert_eq!(result.row_count(), 1);
6386
6387        session.commit().unwrap();
6388    }
6389
6390    // ==================== Session Command Tests ====================
6391
6392    #[cfg(feature = "gql")]
6393    mod session_command_tests {
6394        use super::*;
6395        use grafeo_common::types::Value;
6396
6397        #[test]
6398        fn test_use_graph_sets_current_graph() {
6399            let db = GrafeoDB::new_in_memory();
6400            let session = db.session();
6401
6402            // Create the graph first, then USE it
6403            session.execute("CREATE GRAPH mydb").unwrap();
6404            session.execute("USE GRAPH mydb").unwrap();
6405
6406            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6407        }
6408
6409        #[test]
6410        fn test_use_graph_nonexistent_errors() {
6411            let db = GrafeoDB::new_in_memory();
6412            let session = db.session();
6413
6414            let result = session.execute("USE GRAPH doesnotexist");
6415            assert!(result.is_err());
6416            let err = result.unwrap_err().to_string();
6417            assert!(
6418                err.contains("does not exist"),
6419                "Expected 'does not exist' error, got: {err}"
6420            );
6421        }
6422
6423        #[test]
6424        fn test_use_graph_default_always_valid() {
6425            let db = GrafeoDB::new_in_memory();
6426            let session = db.session();
6427
6428            // "default" is always valid, even without CREATE GRAPH
6429            session.execute("USE GRAPH default").unwrap();
6430            assert_eq!(session.current_graph(), Some("default".to_string()));
6431        }
6432
6433        #[test]
6434        fn test_session_set_graph() {
6435            let db = GrafeoDB::new_in_memory();
6436            let session = db.session();
6437
6438            session.execute("CREATE GRAPH analytics").unwrap();
6439            session.execute("SESSION SET GRAPH analytics").unwrap();
6440            assert_eq!(session.current_graph(), Some("analytics".to_string()));
6441        }
6442
6443        #[test]
6444        fn test_session_set_graph_nonexistent_errors() {
6445            let db = GrafeoDB::new_in_memory();
6446            let session = db.session();
6447
6448            let result = session.execute("SESSION SET GRAPH nosuchgraph");
6449            assert!(result.is_err());
6450        }
6451
6452        #[test]
6453        fn test_session_set_time_zone() {
6454            let db = GrafeoDB::new_in_memory();
6455            let session = db.session();
6456
6457            assert_eq!(session.time_zone(), None);
6458
6459            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6460            assert_eq!(session.time_zone(), Some("UTC".to_string()));
6461
6462            session
6463                .execute("SESSION SET TIME ZONE 'America/New_York'")
6464                .unwrap();
6465            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6466        }
6467
6468        #[test]
6469        fn test_session_set_parameter() {
6470            let db = GrafeoDB::new_in_memory();
6471            let session = db.session();
6472
6473            session
6474                .execute("SESSION SET PARAMETER $timeout = 30")
6475                .unwrap();
6476
6477            // Parameter is stored (value is Null for now, since expression
6478            // evaluation is not yet wired up)
6479            assert!(session.get_parameter("timeout").is_some());
6480        }
6481
6482        #[test]
6483        fn test_session_reset_clears_all_state() {
6484            let db = GrafeoDB::new_in_memory();
6485            let session = db.session();
6486
6487            // Set various session state
6488            session.execute("CREATE GRAPH analytics").unwrap();
6489            session.execute("SESSION SET GRAPH analytics").unwrap();
6490            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6491            session
6492                .execute("SESSION SET PARAMETER $limit = 100")
6493                .unwrap();
6494
6495            // Verify state was set
6496            assert!(session.current_graph().is_some());
6497            assert!(session.time_zone().is_some());
6498            assert!(session.get_parameter("limit").is_some());
6499
6500            // Reset everything
6501            session.execute("SESSION RESET").unwrap();
6502
6503            assert_eq!(session.current_graph(), None);
6504            assert_eq!(session.time_zone(), None);
6505            assert!(session.get_parameter("limit").is_none());
6506        }
6507
6508        #[test]
6509        fn test_session_close_clears_state() {
6510            let db = GrafeoDB::new_in_memory();
6511            let session = db.session();
6512
6513            session.execute("CREATE GRAPH analytics").unwrap();
6514            session.execute("SESSION SET GRAPH analytics").unwrap();
6515            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6516
6517            session.execute("SESSION CLOSE").unwrap();
6518
6519            assert_eq!(session.current_graph(), None);
6520            assert_eq!(session.time_zone(), None);
6521        }
6522
6523        #[test]
6524        fn test_create_graph() {
6525            let db = GrafeoDB::new_in_memory();
6526            let session = db.session();
6527
6528            session.execute("CREATE GRAPH mydb").unwrap();
6529
6530            // Should be able to USE it now
6531            session.execute("USE GRAPH mydb").unwrap();
6532            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6533        }
6534
6535        #[test]
6536        fn test_create_graph_duplicate_errors() {
6537            let db = GrafeoDB::new_in_memory();
6538            let session = db.session();
6539
6540            session.execute("CREATE GRAPH mydb").unwrap();
6541            let result = session.execute("CREATE GRAPH mydb");
6542
6543            assert!(result.is_err());
6544            let err = result.unwrap_err().to_string();
6545            assert!(
6546                err.contains("already exists"),
6547                "Expected 'already exists' error, got: {err}"
6548            );
6549        }
6550
6551        #[test]
6552        fn test_create_graph_if_not_exists() {
6553            let db = GrafeoDB::new_in_memory();
6554            let session = db.session();
6555
6556            session.execute("CREATE GRAPH mydb").unwrap();
6557            // Should succeed silently with IF NOT EXISTS
6558            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6559        }
6560
6561        #[test]
6562        fn test_drop_graph() {
6563            let db = GrafeoDB::new_in_memory();
6564            let session = db.session();
6565
6566            session.execute("CREATE GRAPH mydb").unwrap();
6567            session.execute("DROP GRAPH mydb").unwrap();
6568
6569            // Should no longer be usable
6570            let result = session.execute("USE GRAPH mydb");
6571            assert!(result.is_err());
6572        }
6573
6574        #[test]
6575        fn test_drop_graph_nonexistent_errors() {
6576            let db = GrafeoDB::new_in_memory();
6577            let session = db.session();
6578
6579            let result = session.execute("DROP GRAPH nosuchgraph");
6580            assert!(result.is_err());
6581            let err = result.unwrap_err().to_string();
6582            assert!(
6583                err.contains("does not exist"),
6584                "Expected 'does not exist' error, got: {err}"
6585            );
6586        }
6587
6588        #[test]
6589        fn test_drop_graph_if_exists() {
6590            let db = GrafeoDB::new_in_memory();
6591            let session = db.session();
6592
6593            // Should succeed silently with IF EXISTS
6594            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6595        }
6596
6597        #[test]
6598        fn test_start_transaction_via_gql() {
6599            let db = GrafeoDB::new_in_memory();
6600            let session = db.session();
6601
6602            session.execute("START TRANSACTION").unwrap();
6603            assert!(session.in_transaction());
6604            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6605            session.execute("COMMIT").unwrap();
6606            assert!(!session.in_transaction());
6607
6608            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6609            assert_eq!(result.rows.len(), 1);
6610        }
6611
6612        #[test]
6613        fn test_start_transaction_read_only_blocks_insert() {
6614            let db = GrafeoDB::new_in_memory();
6615            let session = db.session();
6616
6617            session.execute("START TRANSACTION READ ONLY").unwrap();
6618            let result = session.execute("INSERT (:Person {name: 'Alix'})");
6619            assert!(result.is_err());
6620            let err = result.unwrap_err().to_string();
6621            assert!(
6622                err.contains("read-only"),
6623                "Expected read-only error, got: {err}"
6624            );
6625            session.execute("ROLLBACK").unwrap();
6626        }
6627
6628        #[test]
6629        fn test_start_transaction_read_only_allows_reads() {
6630            let db = GrafeoDB::new_in_memory();
6631            let mut session = db.session();
6632            session.begin_transaction().unwrap();
6633            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6634            session.commit().unwrap();
6635
6636            session.execute("START TRANSACTION READ ONLY").unwrap();
6637            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6638            assert_eq!(result.rows.len(), 1);
6639            session.execute("COMMIT").unwrap();
6640        }
6641
6642        #[test]
6643        fn test_rollback_via_gql() {
6644            let db = GrafeoDB::new_in_memory();
6645            let session = db.session();
6646
6647            session.execute("START TRANSACTION").unwrap();
6648            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6649            session.execute("ROLLBACK").unwrap();
6650
6651            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6652            assert!(result.rows.is_empty());
6653        }
6654
6655        #[test]
6656        fn test_start_transaction_with_isolation_level() {
6657            let db = GrafeoDB::new_in_memory();
6658            let session = db.session();
6659
6660            session
6661                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6662                .unwrap();
6663            assert!(session.in_transaction());
6664            session.execute("ROLLBACK").unwrap();
6665        }
6666
6667        #[test]
6668        fn test_session_commands_return_empty_result() {
6669            let db = GrafeoDB::new_in_memory();
6670            let session = db.session();
6671
6672            session.execute("CREATE GRAPH test").unwrap();
6673            let result = session.execute("SESSION SET GRAPH test").unwrap();
6674            assert_eq!(result.row_count(), 0);
6675            assert_eq!(result.column_count(), 0);
6676        }
6677
6678        #[test]
6679        fn test_current_graph_default_is_none() {
6680            let db = GrafeoDB::new_in_memory();
6681            let session = db.session();
6682
6683            assert_eq!(session.current_graph(), None);
6684        }
6685
6686        #[test]
6687        fn test_time_zone_default_is_none() {
6688            let db = GrafeoDB::new_in_memory();
6689            let session = db.session();
6690
6691            assert_eq!(session.time_zone(), None);
6692        }
6693
6694        #[test]
6695        fn test_session_state_independent_across_sessions() {
6696            let db = GrafeoDB::new_in_memory();
6697            let session1 = db.session();
6698            let session2 = db.session();
6699
6700            session1.execute("CREATE GRAPH first").unwrap();
6701            session1.execute("CREATE GRAPH second").unwrap();
6702            session1.execute("SESSION SET GRAPH first").unwrap();
6703            session2.execute("SESSION SET GRAPH second").unwrap();
6704
6705            assert_eq!(session1.current_graph(), Some("first".to_string()));
6706            assert_eq!(session2.current_graph(), Some("second".to_string()));
6707        }
6708
6709        #[test]
6710        fn test_show_node_types() {
6711            let db = GrafeoDB::new_in_memory();
6712            let session = db.session();
6713
6714            session
6715                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6716                .unwrap();
6717
6718            let result = session.execute("SHOW NODE TYPES").unwrap();
6719            assert_eq!(
6720                result.columns,
6721                vec!["name", "properties", "constraints", "parents"]
6722            );
6723            assert_eq!(result.rows.len(), 1);
6724            // First column is the type name
6725            assert_eq!(result.rows[0][0], Value::from("Person"));
6726        }
6727
6728        #[test]
6729        fn test_show_edge_types() {
6730            let db = GrafeoDB::new_in_memory();
6731            let session = db.session();
6732
6733            session
6734                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6735                .unwrap();
6736
6737            let result = session.execute("SHOW EDGE TYPES").unwrap();
6738            assert_eq!(
6739                result.columns,
6740                vec!["name", "properties", "source_types", "target_types"]
6741            );
6742            assert_eq!(result.rows.len(), 1);
6743            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6744        }
6745
6746        #[test]
6747        fn test_show_graph_types() {
6748            let db = GrafeoDB::new_in_memory();
6749            let session = db.session();
6750
6751            session
6752                .execute("CREATE NODE TYPE Person (name STRING)")
6753                .unwrap();
6754            session
6755                .execute(
6756                    "CREATE GRAPH TYPE social (\
6757                        NODE TYPE Person (name STRING)\
6758                    )",
6759                )
6760                .unwrap();
6761
6762            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6763            assert_eq!(
6764                result.columns,
6765                vec!["name", "open", "node_types", "edge_types"]
6766            );
6767            assert_eq!(result.rows.len(), 1);
6768            assert_eq!(result.rows[0][0], Value::from("social"));
6769        }
6770
6771        #[test]
6772        fn test_show_graph_type_named() {
6773            let db = GrafeoDB::new_in_memory();
6774            let session = db.session();
6775
6776            session
6777                .execute("CREATE NODE TYPE Person (name STRING)")
6778                .unwrap();
6779            session
6780                .execute(
6781                    "CREATE GRAPH TYPE social (\
6782                        NODE TYPE Person (name STRING)\
6783                    )",
6784                )
6785                .unwrap();
6786
6787            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6788            assert_eq!(result.rows.len(), 1);
6789            assert_eq!(result.rows[0][0], Value::from("social"));
6790        }
6791
6792        #[test]
6793        fn test_show_graph_type_not_found() {
6794            let db = GrafeoDB::new_in_memory();
6795            let session = db.session();
6796
6797            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6798            assert!(result.is_err());
6799        }
6800
6801        #[test]
6802        fn test_show_indexes_via_gql() {
6803            let db = GrafeoDB::new_in_memory();
6804            let session = db.session();
6805
6806            let result = session.execute("SHOW INDEXES").unwrap();
6807            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6808        }
6809
6810        #[test]
6811        fn test_show_constraints_via_gql() {
6812            let db = GrafeoDB::new_in_memory();
6813            let session = db.session();
6814
6815            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6816            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6817        }
6818
6819        #[test]
6820        fn test_pattern_form_graph_type_roundtrip() {
6821            let db = GrafeoDB::new_in_memory();
6822            let session = db.session();
6823
6824            // Register the types first
6825            session
6826                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6827                .unwrap();
6828            session
6829                .execute("CREATE NODE TYPE City (name STRING)")
6830                .unwrap();
6831            session
6832                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6833                .unwrap();
6834            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6835
6836            // Create graph type using pattern form
6837            session
6838                .execute(
6839                    "CREATE GRAPH TYPE social (\
6840                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6841                        (:Person)-[:LIVES_IN]->(:City)\
6842                    )",
6843                )
6844                .unwrap();
6845
6846            // Verify it was created
6847            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6848            assert_eq!(result.rows.len(), 1);
6849            assert_eq!(result.rows[0][0], Value::from("social"));
6850        }
6851    }
6852}