Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::cache::QueryCache;
37use crate::transaction::TransactionManager;
38
39/// Storage key suffix for the implicit default graph within a schema.
40/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
41const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
42
43/// Parses a DDL default-value literal string into a [`Value`].
44///
45/// Handles string literals (single- or double-quoted), integers, floats,
46/// booleans (`true`/`false`), and `NULL`.
47fn parse_default_literal(text: &str) -> Value {
48    if text.eq_ignore_ascii_case("null") {
49        return Value::Null;
50    }
51    if text.eq_ignore_ascii_case("true") {
52        return Value::Bool(true);
53    }
54    if text.eq_ignore_ascii_case("false") {
55        return Value::Bool(false);
56    }
57    // String literal: strip surrounding quotes
58    if (text.starts_with('\'') && text.ends_with('\''))
59        || (text.starts_with('"') && text.ends_with('"'))
60    {
61        return Value::String(text[1..text.len() - 1].into());
62    }
63    // Try integer, then float
64    if let Ok(i) = text.parse::<i64>() {
65        return Value::Int64(i);
66    }
67    if let Ok(f) = text.parse::<f64>() {
68        return Value::Float64(f);
69    }
70    // Fallback: treat as string
71    Value::String(text.into())
72}
73
74/// Runtime configuration for creating a new session.
75///
76/// Groups the shared parameters passed to all session constructors, keeping
77/// call sites readable and avoiding long argument lists.
78pub(crate) struct SessionConfig {
79    pub transaction_manager: Arc<TransactionManager>,
80    pub query_cache: Arc<QueryCache>,
81    pub catalog: Arc<Catalog>,
82    pub adaptive_config: AdaptiveConfig,
83    pub factorized_execution: bool,
84    pub graph_model: GraphModel,
85    pub query_timeout: Option<Duration>,
86    pub commit_counter: Arc<AtomicUsize>,
87    pub gc_interval: usize,
88    /// When true, the session permanently blocks all mutations.
89    pub read_only: bool,
90}
91
92/// Your handle to the database - execute queries and manage transactions.
93///
94/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
95/// tracks its own transaction state, so you can have multiple concurrent
96/// sessions without them interfering.
97pub struct Session {
98    /// The underlying store.
99    #[cfg(feature = "lpg")]
100    store: Arc<LpgStore>,
101    /// Graph store trait object for pluggable storage backends (read path).
102    graph_store: Arc<dyn GraphStore>,
103    /// Writable graph store (None for read-only databases).
104    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
105    /// Schema and metadata catalog shared across sessions.
106    catalog: Arc<Catalog>,
107    /// RDF triple store (if RDF feature is enabled).
108    #[cfg(feature = "triple-store")]
109    rdf_store: Arc<RdfStore>,
110    /// Transaction manager.
111    transaction_manager: Arc<TransactionManager>,
112    /// Query cache shared across sessions.
113    query_cache: Arc<QueryCache>,
114    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
115    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
116    /// from within `execute(&self)`.
117    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
118    /// Whether the current transaction is read-only (blocks mutations).
119    read_only_tx: parking_lot::Mutex<bool>,
120    /// Whether the database itself is read-only (set at open time, never changes).
121    /// When true, `read_only_tx` is always true regardless of transaction flags.
122    db_read_only: bool,
123    /// Whether the session is in auto-commit mode.
124    auto_commit: bool,
125    /// Adaptive execution configuration.
126    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
127    adaptive_config: AdaptiveConfig,
128    /// Whether to use factorized execution for multi-hop queries.
129    factorized_execution: bool,
130    /// The graph data model this session operates on.
131    graph_model: GraphModel,
132    /// Maximum time a query may run before being cancelled.
133    query_timeout: Option<Duration>,
134    /// Shared commit counter for triggering auto-GC.
135    commit_counter: Arc<AtomicUsize>,
136    /// GC every N commits (0 = disabled).
137    gc_interval: usize,
138    /// Node count at the start of the current transaction (for PreparedCommit stats).
139    transaction_start_node_count: AtomicUsize,
140    /// Edge count at the start of the current transaction (for PreparedCommit stats).
141    transaction_start_edge_count: AtomicUsize,
142    /// WAL for logging schema changes.
143    #[cfg(feature = "wal")]
144    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
145    /// Shared WAL graph context tracker for named graph awareness.
146    #[cfg(feature = "wal")]
147    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
148    /// CDC log for change tracking.
149    #[cfg(feature = "cdc")]
150    cdc_log: Arc<crate::cdc::CdcLog>,
151    /// Buffered CDC events for the current transaction.
152    /// Flushed to `cdc_log` on commit, discarded on rollback.
153    #[cfg(feature = "cdc")]
154    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
155    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
156    current_graph: parking_lot::Mutex<Option<String>>,
157    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
158    /// None = "not set" (uses default schema).
159    current_schema: parking_lot::Mutex<Option<String>>,
160    /// Session time zone override.
161    time_zone: parking_lot::Mutex<Option<String>>,
162    /// Session-level parameters (SET PARAMETER).
163    session_params:
164        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
165    /// Override epoch for time-travel queries (None = use transaction/current epoch).
166    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
167    /// Savepoints within the current transaction.
168    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
169    /// Nesting depth for nested transactions (0 = outermost).
170    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
171    /// releases it, nested `ROLLBACK` rolls back to it.
172    transaction_nesting_depth: parking_lot::Mutex<u32>,
173    /// Named graphs touched during the current transaction (for cross-graph atomicity).
174    /// `None` represents the default graph. Populated at `BEGIN` time and on each
175    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
176    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
177    /// Shared metrics registry (populated when the `metrics` feature is enabled).
178    #[cfg(feature = "metrics")]
179    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
180    /// Transaction start time for duration tracking.
181    #[cfg(feature = "metrics")]
182    tx_start_time: parking_lot::Mutex<Option<Instant>>,
183}
184
185/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
186#[derive(Clone)]
187struct GraphSavepoint {
188    graph_name: Option<String>,
189    next_node_id: u64,
190    next_edge_id: u64,
191    undo_log_position: usize,
192}
193
194/// Savepoint state: name + per-graph snapshots + the graph that was active.
195#[derive(Clone)]
196struct SavepointState {
197    name: String,
198    graph_snapshots: Vec<GraphSavepoint>,
199    /// The graph that was active when the savepoint was created.
200    /// Reserved for future use (e.g., restoring graph context on rollback).
201    #[allow(dead_code)]
202    active_graph: Option<String>,
203    /// CDC event buffer position at savepoint creation.
204    /// On rollback-to-savepoint, the buffer is truncated to this position.
205    #[cfg(feature = "cdc")]
206    cdc_event_position: usize,
207}
208
209impl Session {
210    /// Creates a new session with adaptive execution configuration.
211    #[cfg(feature = "lpg")]
212    #[allow(dead_code)] // Used when lpg enabled without triple-store
213    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
214        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
215        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
216        Self {
217            store,
218            graph_store,
219            graph_store_mut,
220            catalog: cfg.catalog,
221            #[cfg(feature = "triple-store")]
222            rdf_store: Arc::new(RdfStore::new()),
223            transaction_manager: cfg.transaction_manager,
224            query_cache: cfg.query_cache,
225            current_transaction: parking_lot::Mutex::new(None),
226            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
227            db_read_only: cfg.read_only,
228            auto_commit: true,
229            adaptive_config: cfg.adaptive_config,
230            factorized_execution: cfg.factorized_execution,
231            graph_model: cfg.graph_model,
232            query_timeout: cfg.query_timeout,
233            commit_counter: cfg.commit_counter,
234            gc_interval: cfg.gc_interval,
235            transaction_start_node_count: AtomicUsize::new(0),
236            transaction_start_edge_count: AtomicUsize::new(0),
237            #[cfg(feature = "wal")]
238            wal: None,
239            #[cfg(feature = "wal")]
240            wal_graph_context: None,
241            #[cfg(feature = "cdc")]
242            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
243            #[cfg(feature = "cdc")]
244            cdc_pending_events: None,
245            current_graph: parking_lot::Mutex::new(None),
246            current_schema: parking_lot::Mutex::new(None),
247            time_zone: parking_lot::Mutex::new(None),
248            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
249            viewing_epoch_override: parking_lot::Mutex::new(None),
250            savepoints: parking_lot::Mutex::new(Vec::new()),
251            transaction_nesting_depth: parking_lot::Mutex::new(0),
252            touched_graphs: parking_lot::Mutex::new(Vec::new()),
253            #[cfg(feature = "metrics")]
254            metrics: None,
255            #[cfg(feature = "metrics")]
256            tx_start_time: parking_lot::Mutex::new(None),
257        }
258    }
259
260    /// Sets the WAL for this session (shared with the database).
261    ///
262    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
263    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
264    #[cfg(all(feature = "wal", feature = "lpg"))]
265    pub(crate) fn set_wal(
266        &mut self,
267        wal: Arc<grafeo_storage::wal::LpgWal>,
268        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
269    ) {
270        // Wrap the graph store so query-engine mutations are WAL-logged
271        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
272            Arc::clone(&self.store),
273            Arc::clone(&wal),
274            Arc::clone(&wal_graph_context),
275        ));
276        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
277        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
278        self.wal = Some(wal);
279        self.wal_graph_context = Some(wal_graph_context);
280    }
281
282    /// Sets the CDC log for this session (shared with the database).
283    ///
284    /// Wraps the current write store with a `CdcGraphStore` decorator so
285    /// that all session mutations (INSERT, SET, DELETE via query execution)
286    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
287    /// and discarded on rollback.
288    #[cfg(feature = "cdc")]
289    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
290        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
291        // The read store (self.graph_store) is left unchanged for zero read overhead.
292        if let Some(ref write_store) = self.graph_store_mut {
293            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
294                Arc::clone(write_store),
295                Arc::clone(&cdc_log),
296            ));
297            self.cdc_pending_events = Some(cdc_store.pending_events());
298            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
299        }
300        self.cdc_log = cdc_log;
301    }
302
303    /// Sets the metrics registry for this session (shared with the database).
304    #[cfg(feature = "metrics")]
305    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
306        self.metrics = Some(metrics);
307    }
308
309    /// Creates a session backed by an external graph store.
310    ///
311    /// The external store handles all data operations. Transaction management
312    /// (begin/commit/rollback) is not supported for external stores.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if the internal arena allocation fails (out of memory).
317    pub(crate) fn with_external_store(
318        read_store: Arc<dyn GraphStore>,
319        write_store: Option<Arc<dyn GraphStoreMut>>,
320        cfg: SessionConfig,
321    ) -> Result<Self> {
322        Ok(Self {
323            #[cfg(feature = "lpg")]
324            store: Arc::new(LpgStore::new()?),
325            graph_store: read_store,
326            graph_store_mut: write_store,
327            catalog: cfg.catalog,
328            #[cfg(feature = "triple-store")]
329            rdf_store: Arc::new(RdfStore::new()),
330            transaction_manager: cfg.transaction_manager,
331            query_cache: cfg.query_cache,
332            current_transaction: parking_lot::Mutex::new(None),
333            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
334            db_read_only: cfg.read_only,
335            auto_commit: true,
336            adaptive_config: cfg.adaptive_config,
337            factorized_execution: cfg.factorized_execution,
338            graph_model: cfg.graph_model,
339            query_timeout: cfg.query_timeout,
340            commit_counter: cfg.commit_counter,
341            gc_interval: cfg.gc_interval,
342            transaction_start_node_count: AtomicUsize::new(0),
343            transaction_start_edge_count: AtomicUsize::new(0),
344            #[cfg(feature = "wal")]
345            wal: None,
346            #[cfg(feature = "wal")]
347            wal_graph_context: None,
348            #[cfg(feature = "cdc")]
349            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
350            #[cfg(feature = "cdc")]
351            cdc_pending_events: None,
352            current_graph: parking_lot::Mutex::new(None),
353            current_schema: parking_lot::Mutex::new(None),
354            time_zone: parking_lot::Mutex::new(None),
355            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
356            viewing_epoch_override: parking_lot::Mutex::new(None),
357            savepoints: parking_lot::Mutex::new(Vec::new()),
358            transaction_nesting_depth: parking_lot::Mutex::new(0),
359            touched_graphs: parking_lot::Mutex::new(Vec::new()),
360            #[cfg(feature = "metrics")]
361            metrics: None,
362            #[cfg(feature = "metrics")]
363            tx_start_time: parking_lot::Mutex::new(None),
364        })
365    }
366
367    /// Returns the graph model this session operates on.
368    #[must_use]
369    pub fn graph_model(&self) -> GraphModel {
370        self.graph_model
371    }
372
373    // === Session State Management ===
374
375    /// Sets the current graph for this session (USE GRAPH).
376    pub fn use_graph(&self, name: &str) {
377        *self.current_graph.lock() = Some(name.to_string());
378    }
379
380    /// Returns the current graph name, if any.
381    #[must_use]
382    pub fn current_graph(&self) -> Option<String> {
383        self.current_graph.lock().clone()
384    }
385
386    /// Sets the current schema for this session (SESSION SET SCHEMA).
387    ///
388    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
389    pub fn set_schema(&self, name: &str) {
390        *self.current_schema.lock() = Some(name.to_string());
391    }
392
393    /// Returns the current schema name, if any.
394    ///
395    /// `None` means "not set", which resolves to the default schema.
396    #[must_use]
397    pub fn current_schema(&self) -> Option<String> {
398        self.current_schema.lock().clone()
399    }
400
401    /// Computes the effective storage key for a graph, accounting for schema context.
402    ///
403    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
404    /// Uses `/` as separator since it is invalid in GQL identifiers.
405    fn effective_graph_key(&self, graph_name: &str) -> String {
406        let schema = self.current_schema.lock().clone();
407        match schema {
408            Some(s) => format!("{s}/{graph_name}"),
409            None => graph_name.to_string(),
410        }
411    }
412
413    /// Computes the effective storage key for a type, accounting for schema context.
414    ///
415    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
416    fn effective_type_key(&self, type_name: &str) -> String {
417        let schema = self.current_schema.lock().clone();
418        match schema {
419            Some(s) => format!("{s}/{type_name}"),
420            None => type_name.to_string(),
421        }
422    }
423
424    /// Returns the effective storage key for the current graph, accounting for schema.
425    ///
426    /// Combines `current_schema` and `current_graph` into a flat lookup key.
427    fn active_graph_storage_key(&self) -> Option<String> {
428        let graph = self.current_graph.lock().clone();
429        let schema = self.current_schema.lock().clone();
430        match (&schema, &graph) {
431            (None, None) => None,
432            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
433            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
434            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
435                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
436            }
437            (None, Some(name)) => Some(name.clone()),
438            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
439        }
440    }
441
442    /// Returns the graph store for the currently active graph.
443    ///
444    /// If `current_graph` is `None` or `"default"`, returns the session's
445    /// default `graph_store` (already WAL-wrapped for the default graph).
446    /// Otherwise looks up the named graph in the root store and wraps it
447    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
448    /// graph context.
449    fn active_store(&self) -> Arc<dyn GraphStore> {
450        let key = self.active_graph_storage_key();
451        match key {
452            None => Arc::clone(&self.graph_store),
453            #[cfg(feature = "lpg")]
454            Some(ref name) => match self.store.graph(name) {
455                Some(named_store) => {
456                    #[cfg(feature = "wal")]
457                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
458                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
459                            named_store,
460                            Arc::clone(wal),
461                            name.clone(),
462                            Arc::clone(ctx),
463                        )) as Arc<dyn GraphStore>;
464                    }
465                    named_store as Arc<dyn GraphStore>
466                }
467                None => Arc::clone(&self.graph_store),
468            },
469            #[cfg(not(feature = "lpg"))]
470            Some(_) => Arc::clone(&self.graph_store),
471        }
472    }
473
474    /// Returns the writable store for the active graph, if available.
475    ///
476    /// Returns `None` for read-only databases. For named graphs, wraps
477    /// the store with WAL logging when durability is enabled.
478    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
479        let key = self.active_graph_storage_key();
480        match key {
481            None => self.graph_store_mut.as_ref().map(Arc::clone),
482            #[cfg(feature = "lpg")]
483            Some(ref name) => match self.store.graph(name) {
484                Some(named_store) => {
485                    let mut store: Arc<dyn GraphStoreMut> = named_store;
486
487                    #[cfg(feature = "wal")]
488                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
489                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
490                            // WAL needs Arc<LpgStore>, get it fresh
491                            self.store
492                                .graph(name)
493                                .unwrap_or_else(|| Arc::clone(&self.store)),
494                            Arc::clone(wal),
495                            name.clone(),
496                            Arc::clone(ctx),
497                        ));
498                    }
499
500                    #[cfg(feature = "cdc")]
501                    if let Some(ref pending) = self.cdc_pending_events {
502                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
503                            store,
504                            Arc::clone(&self.cdc_log),
505                            Arc::clone(pending),
506                        ));
507                    }
508
509                    Some(store)
510                }
511                None => self.graph_store_mut.as_ref().map(Arc::clone),
512            },
513            #[cfg(not(feature = "lpg"))]
514            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
515        }
516    }
517
518    /// Returns the concrete `LpgStore` for the currently active graph.
519    ///
520    /// Used by direct CRUD methods that need the concrete store type
521    /// for versioned operations.
522    #[cfg(feature = "lpg")]
523    fn active_lpg_store(&self) -> Arc<LpgStore> {
524        let key = self.active_graph_storage_key();
525        match key {
526            None => Arc::clone(&self.store),
527            Some(ref name) => self
528                .store
529                .graph(name)
530                .unwrap_or_else(|| Arc::clone(&self.store)),
531        }
532    }
533
534    /// Resolves a graph name to a concrete `LpgStore`.
535    /// `None` and `"default"` resolve to the session's root store.
536    #[cfg(feature = "lpg")]
537    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
538        match graph_name {
539            None => Arc::clone(&self.store),
540            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
541            Some(name) => self
542                .store
543                .graph(name)
544                .unwrap_or_else(|| Arc::clone(&self.store)),
545        }
546    }
547
548    /// Records the current graph as "touched" if a transaction is active.
549    ///
550    /// Uses the full storage key (schema/graph) so that commit/rollback
551    /// can resolve the correct store via `resolve_store`.
552    fn track_graph_touch(&self) {
553        if self.current_transaction.lock().is_some() {
554            let key = self.active_graph_storage_key();
555            let mut touched = self.touched_graphs.lock();
556            if !touched.contains(&key) {
557                touched.push(key);
558            }
559        }
560    }
561
562    /// Sets the session time zone.
563    pub fn set_time_zone(&self, tz: &str) {
564        *self.time_zone.lock() = Some(tz.to_string());
565    }
566
567    /// Returns the session time zone, if set.
568    #[must_use]
569    pub fn time_zone(&self) -> Option<String> {
570        self.time_zone.lock().clone()
571    }
572
573    /// Sets a session parameter.
574    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
575        self.session_params.lock().insert(key.to_string(), value);
576    }
577
578    /// Gets a session parameter by cloning it.
579    #[must_use]
580    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
581        self.session_params.lock().get(key).cloned()
582    }
583
584    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
585    pub fn reset_session(&self) {
586        *self.current_schema.lock() = None;
587        *self.current_graph.lock() = None;
588        *self.time_zone.lock() = None;
589        self.session_params.lock().clear();
590        *self.viewing_epoch_override.lock() = None;
591    }
592
593    /// Resets only the session schema (Section 7.2 GR1).
594    pub fn reset_schema(&self) {
595        *self.current_schema.lock() = None;
596    }
597
598    /// Resets only the session graph (Section 7.2 GR2).
599    pub fn reset_graph(&self) {
600        *self.current_graph.lock() = None;
601    }
602
603    /// Resets only the session time zone (Section 7.2 GR3).
604    pub fn reset_time_zone(&self) {
605        *self.time_zone.lock() = None;
606    }
607
608    /// Resets only session parameters (Section 7.2 GR4).
609    pub fn reset_parameters(&self) {
610        self.session_params.lock().clear();
611    }
612
613    // --- Time-travel API ---
614
615    /// Sets a viewing epoch override for time-travel queries.
616    ///
617    /// While set, all queries on this session see the database as it existed
618    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
619    /// to return to normal behavior.
620    pub fn set_viewing_epoch(&self, epoch: EpochId) {
621        *self.viewing_epoch_override.lock() = Some(epoch);
622    }
623
624    /// Clears the viewing epoch override, returning to normal behavior.
625    pub fn clear_viewing_epoch(&self) {
626        *self.viewing_epoch_override.lock() = None;
627    }
628
629    /// Returns the current viewing epoch override, if any.
630    #[must_use]
631    pub fn viewing_epoch(&self) -> Option<EpochId> {
632        *self.viewing_epoch_override.lock()
633    }
634
635    /// Returns all versions of a node with their creation/deletion epochs.
636    ///
637    /// Properties and labels reflect the current state (not versioned per-epoch).
638    #[cfg(feature = "lpg")]
639    #[must_use]
640    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
641        self.active_lpg_store().get_node_history(id)
642    }
643
644    /// Returns all versions of an edge with their creation/deletion epochs.
645    ///
646    /// Properties reflect the current state (not versioned per-epoch).
647    #[cfg(feature = "lpg")]
648    #[must_use]
649    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
650        self.active_lpg_store().get_edge_history(id)
651    }
652
653    /// Checks that the session's graph model supports LPG operations.
654    fn require_lpg(&self, language: &str) -> Result<()> {
655        if self.graph_model == GraphModel::Rdf {
656            return Err(grafeo_common::utils::error::Error::Internal(format!(
657                "This is an RDF database. {language} queries require an LPG database."
658            )));
659        }
660        Ok(())
661    }
662
663    /// Executes a session or transaction command, returning an empty result.
664    #[cfg(feature = "gql")]
665    fn execute_session_command(
666        &self,
667        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
668    ) -> Result<QueryResult> {
669        use grafeo_adapters::query::gql::ast::SessionCommand;
670        #[cfg(feature = "lpg")]
671        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
672        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
673
674        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
675        if *self.read_only_tx.lock() {
676            match &cmd {
677                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
678                    return Err(Error::Transaction(
679                        grafeo_common::utils::error::TransactionError::ReadOnly,
680                    ));
681                }
682                _ => {} // Session state + transaction control allowed
683            }
684        }
685
686        match cmd {
687            #[cfg(feature = "lpg")]
688            SessionCommand::CreateGraph {
689                name,
690                if_not_exists,
691                typed,
692                like_graph,
693                copy_of,
694                open: _,
695            } => {
696                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
697                let storage_key = self.effective_graph_key(&name);
698
699                // Validate source graph exists for LIKE / AS COPY OF
700                if let Some(ref src) = like_graph {
701                    let src_key = self.effective_graph_key(src);
702                    if self.store.graph(&src_key).is_none() {
703                        return Err(Error::Query(QueryError::new(
704                            QueryErrorKind::Semantic,
705                            format!("Source graph '{src}' does not exist"),
706                        )));
707                    }
708                }
709                if let Some(ref src) = copy_of {
710                    let src_key = self.effective_graph_key(src);
711                    if self.store.graph(&src_key).is_none() {
712                        return Err(Error::Query(QueryError::new(
713                            QueryErrorKind::Semantic,
714                            format!("Source graph '{src}' does not exist"),
715                        )));
716                    }
717                }
718
719                let created = self
720                    .store
721                    .create_graph(&storage_key)
722                    .map_err(|e| Error::Internal(e.to_string()))?;
723                if !created && !if_not_exists {
724                    return Err(Error::Query(QueryError::new(
725                        QueryErrorKind::Semantic,
726                        format!("Graph '{name}' already exists"),
727                    )));
728                }
729                if created {
730                    #[cfg(feature = "wal")]
731                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
732                        name: storage_key.clone(),
733                    });
734                }
735
736                // AS COPY OF: copy data from source graph
737                if let Some(ref src) = copy_of {
738                    let src_key = self.effective_graph_key(src);
739                    self.store
740                        .copy_graph(Some(&src_key), Some(&storage_key))
741                        .map_err(|e| Error::Internal(e.to_string()))?;
742                }
743
744                // Bind to graph type if specified.
745                // If the parser produced a '/' in the name it is already a qualified
746                // "schema/type" key; otherwise resolve against the current schema.
747                if let Some(type_name) = typed
748                    && let Err(e) = self.catalog.bind_graph_type(
749                        &storage_key,
750                        if type_name.contains('/') {
751                            type_name.clone()
752                        } else {
753                            self.effective_type_key(&type_name)
754                        },
755                    )
756                {
757                    return Err(Error::Query(QueryError::new(
758                        QueryErrorKind::Semantic,
759                        e.to_string(),
760                    )));
761                }
762
763                // LIKE: copy graph type binding from source
764                if let Some(ref src) = like_graph {
765                    let src_key = self.effective_graph_key(src);
766                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
767                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
768                    }
769                }
770
771                Ok(QueryResult::empty())
772            }
773            #[cfg(feature = "lpg")]
774            SessionCommand::DropGraph { name, if_exists } => {
775                let storage_key = self.effective_graph_key(&name);
776                let dropped = self.store.drop_graph(&storage_key);
777                if !dropped && !if_exists {
778                    return Err(Error::Query(QueryError::new(
779                        QueryErrorKind::Semantic,
780                        format!("Graph '{name}' does not exist"),
781                    )));
782                }
783                if dropped {
784                    #[cfg(feature = "wal")]
785                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
786                        name: storage_key.clone(),
787                    });
788                    // If this session was using the dropped graph, reset to default
789                    let mut current = self.current_graph.lock();
790                    if current
791                        .as_deref()
792                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
793                    {
794                        *current = None;
795                    }
796                }
797                Ok(QueryResult::empty())
798            }
799            #[cfg(feature = "lpg")]
800            SessionCommand::UseGraph(name) => {
801                // Verify graph exists (resolve within current schema)
802                let effective_key = self.effective_graph_key(&name);
803                if !name.eq_ignore_ascii_case("default")
804                    && self.store.graph(&effective_key).is_none()
805                {
806                    return Err(Error::Query(QueryError::new(
807                        QueryErrorKind::Semantic,
808                        format!("Graph '{name}' does not exist"),
809                    )));
810                }
811                self.use_graph(&name);
812                // Track the new graph if in a transaction
813                self.track_graph_touch();
814                Ok(QueryResult::empty())
815            }
816            #[cfg(feature = "lpg")]
817            SessionCommand::SessionSetGraph(name) => {
818                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
819                let effective_key = self.effective_graph_key(&name);
820                if !name.eq_ignore_ascii_case("default")
821                    && self.store.graph(&effective_key).is_none()
822                {
823                    return Err(Error::Query(QueryError::new(
824                        QueryErrorKind::Semantic,
825                        format!("Graph '{name}' does not exist"),
826                    )));
827                }
828                self.use_graph(&name);
829                // Track the new graph if in a transaction
830                self.track_graph_touch();
831                Ok(QueryResult::empty())
832            }
833            SessionCommand::SessionSetSchema(name) => {
834                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
835                if !self.catalog.schema_exists(&name) {
836                    return Err(Error::Query(QueryError::new(
837                        QueryErrorKind::Semantic,
838                        format!("Schema '{name}' does not exist"),
839                    )));
840                }
841                self.set_schema(&name);
842                Ok(QueryResult::empty())
843            }
844            SessionCommand::SessionSetTimeZone(tz) => {
845                self.set_time_zone(&tz);
846                Ok(QueryResult::empty())
847            }
848            SessionCommand::SessionSetParameter(key, expr) => {
849                if key.eq_ignore_ascii_case("viewing_epoch") {
850                    match Self::eval_integer_literal(&expr) {
851                        Some(n) if n >= 0 => {
852                            self.set_viewing_epoch(EpochId::new(n as u64));
853                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
854                        }
855                        _ => Err(Error::Query(QueryError::new(
856                            QueryErrorKind::Semantic,
857                            "viewing_epoch must be a non-negative integer literal",
858                        ))),
859                    }
860                } else {
861                    // For now, store parameter name with Null value.
862                    // Full expression evaluation would require building and executing a plan.
863                    self.set_parameter(&key, Value::Null);
864                    Ok(QueryResult::empty())
865                }
866            }
867            SessionCommand::SessionReset(target) => {
868                use grafeo_adapters::query::gql::ast::SessionResetTarget;
869                match target {
870                    SessionResetTarget::All => self.reset_session(),
871                    SessionResetTarget::Schema => self.reset_schema(),
872                    SessionResetTarget::Graph => self.reset_graph(),
873                    SessionResetTarget::TimeZone => self.reset_time_zone(),
874                    SessionResetTarget::Parameters => self.reset_parameters(),
875                }
876                Ok(QueryResult::empty())
877            }
878            SessionCommand::SessionClose => {
879                self.reset_session();
880                Ok(QueryResult::empty())
881            }
882            #[cfg(feature = "lpg")]
883            SessionCommand::StartTransaction {
884                read_only,
885                isolation_level,
886            } => {
887                let engine_level = isolation_level.map(|l| match l {
888                    TransactionIsolationLevel::ReadCommitted => {
889                        crate::transaction::IsolationLevel::ReadCommitted
890                    }
891                    TransactionIsolationLevel::SnapshotIsolation => {
892                        crate::transaction::IsolationLevel::SnapshotIsolation
893                    }
894                    TransactionIsolationLevel::Serializable => {
895                        crate::transaction::IsolationLevel::Serializable
896                    }
897                });
898                self.begin_transaction_inner(read_only, engine_level)?;
899                Ok(QueryResult::status("Transaction started"))
900            }
901            #[cfg(feature = "lpg")]
902            SessionCommand::Commit => {
903                self.commit_inner()?;
904                Ok(QueryResult::status("Transaction committed"))
905            }
906            #[cfg(feature = "lpg")]
907            SessionCommand::Rollback => {
908                self.rollback_inner()?;
909                Ok(QueryResult::status("Transaction rolled back"))
910            }
911            #[cfg(feature = "lpg")]
912            SessionCommand::Savepoint(name) => {
913                self.savepoint(&name)?;
914                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
915            }
916            #[cfg(feature = "lpg")]
917            SessionCommand::RollbackToSavepoint(name) => {
918                self.rollback_to_savepoint(&name)?;
919                Ok(QueryResult::status(format!(
920                    "Rolled back to savepoint '{name}'"
921                )))
922            }
923            #[cfg(feature = "lpg")]
924            SessionCommand::ReleaseSavepoint(name) => {
925                self.release_savepoint(&name)?;
926                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
927            }
928            #[cfg(not(feature = "lpg"))]
929            _ => Err(grafeo_common::utils::error::Error::Internal(
930                "This command requires the `lpg` feature".to_string(),
931            )),
932        }
933    }
934
935    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
936    #[cfg(feature = "wal")]
937    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
938        if let Some(ref wal) = self.wal
939            && let Err(e) = wal.log(record)
940        {
941            grafeo_warn!("Failed to log schema change to WAL: {}", e);
942        }
943    }
944
945    /// Executes a schema DDL command, returning a status result.
946    #[cfg(all(feature = "lpg", feature = "gql"))]
947    fn execute_schema_command(
948        &self,
949        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
950    ) -> Result<QueryResult> {
951        use crate::catalog::{
952            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
953        };
954        use grafeo_adapters::query::gql::ast::SchemaStatement;
955        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
956        #[cfg(feature = "wal")]
957        use grafeo_storage::wal::WalRecord;
958
959        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
960        macro_rules! wal_log {
961            ($self:expr, $record:expr) => {
962                #[cfg(feature = "wal")]
963                $self.log_schema_wal(&$record);
964            };
965        }
966
967        let result = match cmd {
968            SchemaStatement::CreateNodeType(stmt) => {
969                let effective_name = self.effective_type_key(&stmt.name);
970                #[cfg(feature = "wal")]
971                let props_for_wal: Vec<(String, String, bool)> = stmt
972                    .properties
973                    .iter()
974                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
975                    .collect();
976                let def = NodeTypeDefinition {
977                    name: effective_name.clone(),
978                    properties: stmt
979                        .properties
980                        .iter()
981                        .map(|p| TypedProperty {
982                            name: p.name.clone(),
983                            data_type: PropertyDataType::from_type_name(&p.data_type),
984                            nullable: p.nullable,
985                            default_value: p
986                                .default_value
987                                .as_ref()
988                                .map(|s| parse_default_literal(s)),
989                        })
990                        .collect(),
991                    constraints: Vec::new(),
992                    parent_types: stmt.parent_types.clone(),
993                };
994                let result = if stmt.or_replace {
995                    let _ = self.catalog.drop_node_type(&effective_name);
996                    self.catalog.register_node_type(def)
997                } else {
998                    self.catalog.register_node_type(def)
999                };
1000                match result {
1001                    Ok(()) => {
1002                        wal_log!(
1003                            self,
1004                            WalRecord::CreateNodeType {
1005                                name: effective_name.clone(),
1006                                properties: props_for_wal,
1007                                constraints: Vec::new(),
1008                            }
1009                        );
1010                        Ok(QueryResult::status(format!(
1011                            "Created node type '{}'",
1012                            stmt.name
1013                        )))
1014                    }
1015                    Err(e) if stmt.if_not_exists => {
1016                        let _ = e;
1017                        Ok(QueryResult::status("No change"))
1018                    }
1019                    Err(e) => Err(Error::Query(QueryError::new(
1020                        QueryErrorKind::Semantic,
1021                        e.to_string(),
1022                    ))),
1023                }
1024            }
1025            SchemaStatement::CreateEdgeType(stmt) => {
1026                let effective_name = self.effective_type_key(&stmt.name);
1027                #[cfg(feature = "wal")]
1028                let props_for_wal: Vec<(String, String, bool)> = stmt
1029                    .properties
1030                    .iter()
1031                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1032                    .collect();
1033                let def = EdgeTypeDefinition {
1034                    name: effective_name.clone(),
1035                    properties: stmt
1036                        .properties
1037                        .iter()
1038                        .map(|p| TypedProperty {
1039                            name: p.name.clone(),
1040                            data_type: PropertyDataType::from_type_name(&p.data_type),
1041                            nullable: p.nullable,
1042                            default_value: p
1043                                .default_value
1044                                .as_ref()
1045                                .map(|s| parse_default_literal(s)),
1046                        })
1047                        .collect(),
1048                    constraints: Vec::new(),
1049                    source_node_types: stmt.source_node_types.clone(),
1050                    target_node_types: stmt.target_node_types.clone(),
1051                };
1052                let result = if stmt.or_replace {
1053                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1054                    self.catalog.register_edge_type_def(def)
1055                } else {
1056                    self.catalog.register_edge_type_def(def)
1057                };
1058                match result {
1059                    Ok(()) => {
1060                        wal_log!(
1061                            self,
1062                            WalRecord::CreateEdgeType {
1063                                name: effective_name.clone(),
1064                                properties: props_for_wal,
1065                                constraints: Vec::new(),
1066                            }
1067                        );
1068                        Ok(QueryResult::status(format!(
1069                            "Created edge type '{}'",
1070                            stmt.name
1071                        )))
1072                    }
1073                    Err(e) if stmt.if_not_exists => {
1074                        let _ = e;
1075                        Ok(QueryResult::status("No change"))
1076                    }
1077                    Err(e) => Err(Error::Query(QueryError::new(
1078                        QueryErrorKind::Semantic,
1079                        e.to_string(),
1080                    ))),
1081                }
1082            }
1083            SchemaStatement::CreateVectorIndex(stmt) => {
1084                Self::create_vector_index_on_store(
1085                    &self.active_lpg_store(),
1086                    &stmt.node_label,
1087                    &stmt.property,
1088                    stmt.dimensions,
1089                    stmt.metric.as_deref(),
1090                )?;
1091                wal_log!(
1092                    self,
1093                    WalRecord::CreateIndex {
1094                        name: stmt.name.clone(),
1095                        label: stmt.node_label.clone(),
1096                        property: stmt.property.clone(),
1097                        index_type: "vector".to_string(),
1098                    }
1099                );
1100                Ok(QueryResult::status(format!(
1101                    "Created vector index '{}'",
1102                    stmt.name
1103                )))
1104            }
1105            SchemaStatement::DropNodeType { name, if_exists } => {
1106                let effective_name = self.effective_type_key(&name);
1107                match self.catalog.drop_node_type(&effective_name) {
1108                    Ok(()) => {
1109                        wal_log!(
1110                            self,
1111                            WalRecord::DropNodeType {
1112                                name: effective_name
1113                            }
1114                        );
1115                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1116                    }
1117                    Err(e) if if_exists => {
1118                        let _ = e;
1119                        Ok(QueryResult::status("No change"))
1120                    }
1121                    Err(e) => Err(Error::Query(QueryError::new(
1122                        QueryErrorKind::Semantic,
1123                        e.to_string(),
1124                    ))),
1125                }
1126            }
1127            SchemaStatement::DropEdgeType { name, if_exists } => {
1128                let effective_name = self.effective_type_key(&name);
1129                match self.catalog.drop_edge_type_def(&effective_name) {
1130                    Ok(()) => {
1131                        wal_log!(
1132                            self,
1133                            WalRecord::DropEdgeType {
1134                                name: effective_name
1135                            }
1136                        );
1137                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1138                    }
1139                    Err(e) if if_exists => {
1140                        let _ = e;
1141                        Ok(QueryResult::status("No change"))
1142                    }
1143                    Err(e) => Err(Error::Query(QueryError::new(
1144                        QueryErrorKind::Semantic,
1145                        e.to_string(),
1146                    ))),
1147                }
1148            }
1149            SchemaStatement::CreateIndex(stmt) => {
1150                use crate::catalog::IndexType as CatalogIndexType;
1151                use grafeo_adapters::query::gql::ast::IndexKind;
1152                let active = self.active_lpg_store();
1153                let index_type_str = match stmt.index_kind {
1154                    IndexKind::Property => "property",
1155                    IndexKind::BTree => "btree",
1156                    IndexKind::Text => "text",
1157                    IndexKind::Vector => "vector",
1158                };
1159                match stmt.index_kind {
1160                    IndexKind::Property | IndexKind::BTree => {
1161                        for prop in &stmt.properties {
1162                            active.create_property_index(prop);
1163                        }
1164                    }
1165                    IndexKind::Text => {
1166                        for prop in &stmt.properties {
1167                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1168                        }
1169                    }
1170                    IndexKind::Vector => {
1171                        for prop in &stmt.properties {
1172                            Self::create_vector_index_on_store(
1173                                &active,
1174                                &stmt.label,
1175                                prop,
1176                                stmt.options.dimensions,
1177                                stmt.options.metric.as_deref(),
1178                            )?;
1179                        }
1180                    }
1181                }
1182                // Register each property index in the catalog for SHOW INDEXES
1183                // and DROP INDEX name-based lookup.
1184                let catalog_index_type = match stmt.index_kind {
1185                    IndexKind::Property => CatalogIndexType::Hash,
1186                    IndexKind::BTree => CatalogIndexType::BTree,
1187                    IndexKind::Text => CatalogIndexType::FullText,
1188                    IndexKind::Vector => CatalogIndexType::Hash,
1189                };
1190                let label_id = self.catalog.get_or_create_label(&stmt.label);
1191                for prop in &stmt.properties {
1192                    let prop_id = self.catalog.get_or_create_property_key(prop);
1193                    self.catalog
1194                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1195                }
1196                #[cfg(feature = "wal")]
1197                for prop in &stmt.properties {
1198                    wal_log!(
1199                        self,
1200                        WalRecord::CreateIndex {
1201                            name: stmt.name.clone(),
1202                            label: stmt.label.clone(),
1203                            property: prop.clone(),
1204                            index_type: index_type_str.to_string(),
1205                        }
1206                    );
1207                }
1208                Ok(QueryResult::status(format!(
1209                    "Created {} index '{}'",
1210                    index_type_str, stmt.name
1211                )))
1212            }
1213            SchemaStatement::DropIndex { name, if_exists } => {
1214                // Look up the index by name in the catalog to find the
1215                // underlying property, then drop from both catalog and store.
1216                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1217                    let def = self.catalog.get_index(index_id);
1218                    self.catalog.drop_index(index_id);
1219                    if let Some(def) = def
1220                        && let Some(prop_name) =
1221                            self.catalog.get_property_key_name(def.property_key)
1222                    {
1223                        self.active_lpg_store().drop_property_index(&prop_name);
1224                    }
1225                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1226                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1227                } else if if_exists {
1228                    Ok(QueryResult::status("No change".to_string()))
1229                } else {
1230                    Err(Error::Query(QueryError::new(
1231                        QueryErrorKind::Semantic,
1232                        format!("Index '{name}' does not exist"),
1233                    )))
1234                }
1235            }
1236            SchemaStatement::CreateConstraint(stmt) => {
1237                use crate::catalog::TypeConstraint;
1238                use grafeo_adapters::query::gql::ast::ConstraintKind;
1239                let kind_str = match stmt.constraint_kind {
1240                    ConstraintKind::Unique => "unique",
1241                    ConstraintKind::NodeKey => "node_key",
1242                    ConstraintKind::NotNull => "not_null",
1243                    ConstraintKind::Exists => "exists",
1244                };
1245                let constraint_name = stmt
1246                    .name
1247                    .clone()
1248                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1249
1250                // Register constraint in catalog type definitions
1251                match stmt.constraint_kind {
1252                    ConstraintKind::Unique => {
1253                        for prop in &stmt.properties {
1254                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1255                            let prop_id = self.catalog.get_or_create_property_key(prop);
1256                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1257                        }
1258                        let _ = self.catalog.add_constraint_to_type(
1259                            &stmt.label,
1260                            TypeConstraint::Unique(stmt.properties.clone()),
1261                        );
1262                    }
1263                    ConstraintKind::NodeKey => {
1264                        for prop in &stmt.properties {
1265                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1266                            let prop_id = self.catalog.get_or_create_property_key(prop);
1267                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1268                            let _ = self.catalog.add_required_property(label_id, prop_id);
1269                        }
1270                        let _ = self.catalog.add_constraint_to_type(
1271                            &stmt.label,
1272                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1273                        );
1274                    }
1275                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1276                        for prop in &stmt.properties {
1277                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1278                            let prop_id = self.catalog.get_or_create_property_key(prop);
1279                            let _ = self.catalog.add_required_property(label_id, prop_id);
1280                            let _ = self.catalog.add_constraint_to_type(
1281                                &stmt.label,
1282                                TypeConstraint::NotNull(prop.clone()),
1283                            );
1284                        }
1285                    }
1286                }
1287
1288                wal_log!(
1289                    self,
1290                    WalRecord::CreateConstraint {
1291                        name: constraint_name.clone(),
1292                        label: stmt.label.clone(),
1293                        properties: stmt.properties.clone(),
1294                        kind: kind_str.to_string(),
1295                    }
1296                );
1297                Ok(QueryResult::status(format!(
1298                    "Created {kind_str} constraint '{constraint_name}'"
1299                )))
1300            }
1301            SchemaStatement::DropConstraint { name, if_exists } => {
1302                let _ = if_exists;
1303                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1304                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1305            }
1306            SchemaStatement::CreateGraphType(stmt) => {
1307                use crate::catalog::GraphTypeDefinition;
1308                use grafeo_adapters::query::gql::ast::InlineElementType;
1309
1310                let effective_name = self.effective_type_key(&stmt.name);
1311
1312                // GG04: LIKE clause copies type from existing graph
1313                let (mut node_types, mut edge_types, open) =
1314                    if let Some(ref like_graph) = stmt.like_graph {
1315                        // Infer types from the graph's bound type, or use its existing types
1316                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1317                            if let Some(existing) = self
1318                                .catalog
1319                                .schema()
1320                                .and_then(|s| s.get_graph_type(&type_name))
1321                            {
1322                                (
1323                                    existing.allowed_node_types.clone(),
1324                                    existing.allowed_edge_types.clone(),
1325                                    existing.open,
1326                                )
1327                            } else {
1328                                (Vec::new(), Vec::new(), true)
1329                            }
1330                        } else {
1331                            // GG22: Infer from graph data (labels used in graph)
1332                            let nt = self.catalog.all_node_type_names();
1333                            let et = self.catalog.all_edge_type_names();
1334                            if nt.is_empty() && et.is_empty() {
1335                                (Vec::new(), Vec::new(), true)
1336                            } else {
1337                                (nt, et, false)
1338                            }
1339                        }
1340                    } else {
1341                        // Prefix element type names with schema for consistency
1342                        let nt = stmt
1343                            .node_types
1344                            .iter()
1345                            .map(|n| self.effective_type_key(n))
1346                            .collect();
1347                        let et = stmt
1348                            .edge_types
1349                            .iter()
1350                            .map(|n| self.effective_type_key(n))
1351                            .collect();
1352                        (nt, et, stmt.open)
1353                    };
1354
1355                // GG03: Register inline element types and add their names
1356                for inline in &stmt.inline_types {
1357                    match inline {
1358                        InlineElementType::Node {
1359                            name,
1360                            properties,
1361                            key_labels,
1362                            ..
1363                        } => {
1364                            let inline_effective = self.effective_type_key(name);
1365                            let def = NodeTypeDefinition {
1366                                name: inline_effective.clone(),
1367                                properties: properties
1368                                    .iter()
1369                                    .map(|p| TypedProperty {
1370                                        name: p.name.clone(),
1371                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1372                                        nullable: p.nullable,
1373                                        default_value: None,
1374                                    })
1375                                    .collect(),
1376                                constraints: Vec::new(),
1377                                parent_types: key_labels.clone(),
1378                            };
1379                            // Register or replace so inline defs override existing
1380                            self.catalog.register_or_replace_node_type(def);
1381                            #[cfg(feature = "wal")]
1382                            {
1383                                let props_for_wal: Vec<(String, String, bool)> = properties
1384                                    .iter()
1385                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1386                                    .collect();
1387                                self.log_schema_wal(&WalRecord::CreateNodeType {
1388                                    name: inline_effective.clone(),
1389                                    properties: props_for_wal,
1390                                    constraints: Vec::new(),
1391                                });
1392                            }
1393                            if !node_types.contains(&inline_effective) {
1394                                node_types.push(inline_effective);
1395                            }
1396                        }
1397                        InlineElementType::Edge {
1398                            name,
1399                            properties,
1400                            source_node_types,
1401                            target_node_types,
1402                            ..
1403                        } => {
1404                            let inline_effective = self.effective_type_key(name);
1405                            let def = EdgeTypeDefinition {
1406                                name: inline_effective.clone(),
1407                                properties: properties
1408                                    .iter()
1409                                    .map(|p| TypedProperty {
1410                                        name: p.name.clone(),
1411                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1412                                        nullable: p.nullable,
1413                                        default_value: None,
1414                                    })
1415                                    .collect(),
1416                                constraints: Vec::new(),
1417                                source_node_types: source_node_types.clone(),
1418                                target_node_types: target_node_types.clone(),
1419                            };
1420                            self.catalog.register_or_replace_edge_type_def(def);
1421                            #[cfg(feature = "wal")]
1422                            {
1423                                let props_for_wal: Vec<(String, String, bool)> = properties
1424                                    .iter()
1425                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1426                                    .collect();
1427                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1428                                    name: inline_effective.clone(),
1429                                    properties: props_for_wal,
1430                                    constraints: Vec::new(),
1431                                });
1432                            }
1433                            if !edge_types.contains(&inline_effective) {
1434                                edge_types.push(inline_effective);
1435                            }
1436                        }
1437                    }
1438                }
1439
1440                let def = GraphTypeDefinition {
1441                    name: effective_name.clone(),
1442                    allowed_node_types: node_types.clone(),
1443                    allowed_edge_types: edge_types.clone(),
1444                    open,
1445                };
1446                let result = if stmt.or_replace {
1447                    // Drop existing first, ignore error if not found
1448                    let _ = self.catalog.drop_graph_type(&effective_name);
1449                    self.catalog.register_graph_type(def)
1450                } else {
1451                    self.catalog.register_graph_type(def)
1452                };
1453                match result {
1454                    Ok(()) => {
1455                        wal_log!(
1456                            self,
1457                            WalRecord::CreateGraphType {
1458                                name: effective_name.clone(),
1459                                node_types,
1460                                edge_types,
1461                                open,
1462                            }
1463                        );
1464                        Ok(QueryResult::status(format!(
1465                            "Created graph type '{}'",
1466                            stmt.name
1467                        )))
1468                    }
1469                    Err(e) if stmt.if_not_exists => {
1470                        let _ = e;
1471                        Ok(QueryResult::status("No change"))
1472                    }
1473                    Err(e) => Err(Error::Query(QueryError::new(
1474                        QueryErrorKind::Semantic,
1475                        e.to_string(),
1476                    ))),
1477                }
1478            }
1479            SchemaStatement::DropGraphType { name, if_exists } => {
1480                let effective_name = self.effective_type_key(&name);
1481                match self.catalog.drop_graph_type(&effective_name) {
1482                    Ok(()) => {
1483                        wal_log!(
1484                            self,
1485                            WalRecord::DropGraphType {
1486                                name: effective_name
1487                            }
1488                        );
1489                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1490                    }
1491                    Err(e) if if_exists => {
1492                        let _ = e;
1493                        Ok(QueryResult::status("No change"))
1494                    }
1495                    Err(e) => Err(Error::Query(QueryError::new(
1496                        QueryErrorKind::Semantic,
1497                        e.to_string(),
1498                    ))),
1499                }
1500            }
1501            SchemaStatement::CreateSchema {
1502                name,
1503                if_not_exists,
1504            } => match self.catalog.register_schema_namespace(name.clone()) {
1505                Ok(()) => {
1506                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1507                    // Auto-create the schema's default graph partition so that
1508                    // SESSION SET SCHEMA + queries work without an explicit graph.
1509                    let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1510                    if self.store.create_graph(&default_key).unwrap_or(false) {
1511                        wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1512                    }
1513                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1514                }
1515                Err(e) if if_not_exists => {
1516                    let _ = e;
1517                    Ok(QueryResult::status("No change"))
1518                }
1519                Err(e) => Err(Error::Query(QueryError::new(
1520                    QueryErrorKind::Semantic,
1521                    e.to_string(),
1522                ))),
1523            },
1524            SchemaStatement::DropSchema { name, if_exists } => {
1525                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1526                // The auto-created __default__ graph is exempt from the check.
1527                let prefix = format!("{name}/");
1528                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1529                let has_graphs = self
1530                    .store
1531                    .graph_names()
1532                    .iter()
1533                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1534                let has_types = self
1535                    .catalog
1536                    .all_node_type_names()
1537                    .iter()
1538                    .any(|n| n.starts_with(&prefix))
1539                    || self
1540                        .catalog
1541                        .all_edge_type_names()
1542                        .iter()
1543                        .any(|n| n.starts_with(&prefix))
1544                    || self
1545                        .catalog
1546                        .all_graph_type_names()
1547                        .iter()
1548                        .any(|n| n.starts_with(&prefix));
1549                if has_graphs || has_types {
1550                    return Err(Error::Query(QueryError::new(
1551                        QueryErrorKind::Semantic,
1552                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1553                    )));
1554                }
1555                match self.catalog.drop_schema_namespace(&name) {
1556                    Ok(()) => {
1557                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1558                        // Drop the auto-created default graph partition
1559                        if self.store.drop_graph(&default_graph_key) {
1560                            wal_log!(
1561                                self,
1562                                WalRecord::DropNamedGraph {
1563                                    name: default_graph_key,
1564                                }
1565                            );
1566                        }
1567                        // If this session was using the dropped schema, reset it
1568                        let mut current = self.current_schema.lock();
1569                        if current
1570                            .as_deref()
1571                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1572                        {
1573                            *current = None;
1574                        }
1575                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1576                    }
1577                    Err(e) if if_exists => {
1578                        let _ = e;
1579                        Ok(QueryResult::status("No change"))
1580                    }
1581                    Err(e) => Err(Error::Query(QueryError::new(
1582                        QueryErrorKind::Semantic,
1583                        e.to_string(),
1584                    ))),
1585                }
1586            }
1587            SchemaStatement::AlterNodeType(stmt) => {
1588                use grafeo_adapters::query::gql::ast::TypeAlteration;
1589                let effective_name = self.effective_type_key(&stmt.name);
1590                let mut wal_alts = Vec::new();
1591                for alt in &stmt.alterations {
1592                    match alt {
1593                        TypeAlteration::AddProperty(prop) => {
1594                            let typed = TypedProperty {
1595                                name: prop.name.clone(),
1596                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1597                                nullable: prop.nullable,
1598                                default_value: prop
1599                                    .default_value
1600                                    .as_ref()
1601                                    .map(|s| parse_default_literal(s)),
1602                            };
1603                            self.catalog
1604                                .alter_node_type_add_property(&effective_name, typed)
1605                                .map_err(|e| {
1606                                    Error::Query(QueryError::new(
1607                                        QueryErrorKind::Semantic,
1608                                        e.to_string(),
1609                                    ))
1610                                })?;
1611                            wal_alts.push((
1612                                "add".to_string(),
1613                                prop.name.clone(),
1614                                prop.data_type.clone(),
1615                                prop.nullable,
1616                            ));
1617                        }
1618                        TypeAlteration::DropProperty(name) => {
1619                            self.catalog
1620                                .alter_node_type_drop_property(&effective_name, name)
1621                                .map_err(|e| {
1622                                    Error::Query(QueryError::new(
1623                                        QueryErrorKind::Semantic,
1624                                        e.to_string(),
1625                                    ))
1626                                })?;
1627                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1628                        }
1629                    }
1630                }
1631                wal_log!(
1632                    self,
1633                    WalRecord::AlterNodeType {
1634                        name: effective_name,
1635                        alterations: wal_alts,
1636                    }
1637                );
1638                Ok(QueryResult::status(format!(
1639                    "Altered node type '{}'",
1640                    stmt.name
1641                )))
1642            }
1643            SchemaStatement::AlterEdgeType(stmt) => {
1644                use grafeo_adapters::query::gql::ast::TypeAlteration;
1645                let effective_name = self.effective_type_key(&stmt.name);
1646                let mut wal_alts = Vec::new();
1647                for alt in &stmt.alterations {
1648                    match alt {
1649                        TypeAlteration::AddProperty(prop) => {
1650                            let typed = TypedProperty {
1651                                name: prop.name.clone(),
1652                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1653                                nullable: prop.nullable,
1654                                default_value: prop
1655                                    .default_value
1656                                    .as_ref()
1657                                    .map(|s| parse_default_literal(s)),
1658                            };
1659                            self.catalog
1660                                .alter_edge_type_add_property(&effective_name, typed)
1661                                .map_err(|e| {
1662                                    Error::Query(QueryError::new(
1663                                        QueryErrorKind::Semantic,
1664                                        e.to_string(),
1665                                    ))
1666                                })?;
1667                            wal_alts.push((
1668                                "add".to_string(),
1669                                prop.name.clone(),
1670                                prop.data_type.clone(),
1671                                prop.nullable,
1672                            ));
1673                        }
1674                        TypeAlteration::DropProperty(name) => {
1675                            self.catalog
1676                                .alter_edge_type_drop_property(&effective_name, name)
1677                                .map_err(|e| {
1678                                    Error::Query(QueryError::new(
1679                                        QueryErrorKind::Semantic,
1680                                        e.to_string(),
1681                                    ))
1682                                })?;
1683                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1684                        }
1685                    }
1686                }
1687                wal_log!(
1688                    self,
1689                    WalRecord::AlterEdgeType {
1690                        name: effective_name,
1691                        alterations: wal_alts,
1692                    }
1693                );
1694                Ok(QueryResult::status(format!(
1695                    "Altered edge type '{}'",
1696                    stmt.name
1697                )))
1698            }
1699            SchemaStatement::AlterGraphType(stmt) => {
1700                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1701                let effective_name = self.effective_type_key(&stmt.name);
1702                let mut wal_alts = Vec::new();
1703                for alt in &stmt.alterations {
1704                    match alt {
1705                        GraphTypeAlteration::AddNodeType(name) => {
1706                            self.catalog
1707                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1708                                .map_err(|e| {
1709                                    Error::Query(QueryError::new(
1710                                        QueryErrorKind::Semantic,
1711                                        e.to_string(),
1712                                    ))
1713                                })?;
1714                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1715                        }
1716                        GraphTypeAlteration::DropNodeType(name) => {
1717                            self.catalog
1718                                .alter_graph_type_drop_node_type(&effective_name, name)
1719                                .map_err(|e| {
1720                                    Error::Query(QueryError::new(
1721                                        QueryErrorKind::Semantic,
1722                                        e.to_string(),
1723                                    ))
1724                                })?;
1725                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1726                        }
1727                        GraphTypeAlteration::AddEdgeType(name) => {
1728                            self.catalog
1729                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1730                                .map_err(|e| {
1731                                    Error::Query(QueryError::new(
1732                                        QueryErrorKind::Semantic,
1733                                        e.to_string(),
1734                                    ))
1735                                })?;
1736                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1737                        }
1738                        GraphTypeAlteration::DropEdgeType(name) => {
1739                            self.catalog
1740                                .alter_graph_type_drop_edge_type(&effective_name, name)
1741                                .map_err(|e| {
1742                                    Error::Query(QueryError::new(
1743                                        QueryErrorKind::Semantic,
1744                                        e.to_string(),
1745                                    ))
1746                                })?;
1747                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1748                        }
1749                    }
1750                }
1751                wal_log!(
1752                    self,
1753                    WalRecord::AlterGraphType {
1754                        name: effective_name,
1755                        alterations: wal_alts,
1756                    }
1757                );
1758                Ok(QueryResult::status(format!(
1759                    "Altered graph type '{}'",
1760                    stmt.name
1761                )))
1762            }
1763            SchemaStatement::CreateProcedure(stmt) => {
1764                use crate::catalog::ProcedureDefinition;
1765
1766                let def = ProcedureDefinition {
1767                    name: stmt.name.clone(),
1768                    params: stmt
1769                        .params
1770                        .iter()
1771                        .map(|p| (p.name.clone(), p.param_type.clone()))
1772                        .collect(),
1773                    returns: stmt
1774                        .returns
1775                        .iter()
1776                        .map(|r| (r.name.clone(), r.return_type.clone()))
1777                        .collect(),
1778                    body: stmt.body.clone(),
1779                };
1780
1781                if stmt.or_replace {
1782                    self.catalog.replace_procedure(def).map_err(|e| {
1783                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1784                    })?;
1785                } else {
1786                    match self.catalog.register_procedure(def) {
1787                        Ok(()) => {}
1788                        Err(_) if stmt.if_not_exists => {
1789                            return Ok(QueryResult::empty());
1790                        }
1791                        Err(e) => {
1792                            return Err(Error::Query(QueryError::new(
1793                                QueryErrorKind::Semantic,
1794                                e.to_string(),
1795                            )));
1796                        }
1797                    }
1798                }
1799
1800                wal_log!(
1801                    self,
1802                    WalRecord::CreateProcedure {
1803                        name: stmt.name.clone(),
1804                        params: stmt
1805                            .params
1806                            .iter()
1807                            .map(|p| (p.name.clone(), p.param_type.clone()))
1808                            .collect(),
1809                        returns: stmt
1810                            .returns
1811                            .iter()
1812                            .map(|r| (r.name.clone(), r.return_type.clone()))
1813                            .collect(),
1814                        body: stmt.body,
1815                    }
1816                );
1817                Ok(QueryResult::status(format!(
1818                    "Created procedure '{}'",
1819                    stmt.name
1820                )))
1821            }
1822            SchemaStatement::DropProcedure { name, if_exists } => {
1823                match self.catalog.drop_procedure(&name) {
1824                    Ok(()) => {}
1825                    Err(_) if if_exists => {
1826                        return Ok(QueryResult::empty());
1827                    }
1828                    Err(e) => {
1829                        return Err(Error::Query(QueryError::new(
1830                            QueryErrorKind::Semantic,
1831                            e.to_string(),
1832                        )));
1833                    }
1834                }
1835                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1836                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1837            }
1838            SchemaStatement::ShowIndexes => {
1839                return self.execute_show_indexes();
1840            }
1841            SchemaStatement::ShowConstraints => {
1842                return self.execute_show_constraints();
1843            }
1844            SchemaStatement::ShowNodeTypes => {
1845                return self.execute_show_node_types();
1846            }
1847            SchemaStatement::ShowEdgeTypes => {
1848                return self.execute_show_edge_types();
1849            }
1850            SchemaStatement::ShowGraphTypes => {
1851                return self.execute_show_graph_types();
1852            }
1853            SchemaStatement::ShowGraphType(name) => {
1854                return self.execute_show_graph_type(&name);
1855            }
1856            SchemaStatement::ShowCurrentGraphType => {
1857                return self.execute_show_current_graph_type();
1858            }
1859            SchemaStatement::ShowGraphs => {
1860                return self.execute_show_graphs();
1861            }
1862            SchemaStatement::ShowSchemas => {
1863                return self.execute_show_schemas();
1864            }
1865        };
1866
1867        // Invalidate all cached query plans after any successful DDL change.
1868        // DDL is rare, so clearing the entire cache is cheap and correct.
1869        if result.is_ok() {
1870            self.query_cache.clear();
1871        }
1872
1873        result
1874    }
1875
1876    /// Creates a vector index on the store by scanning existing nodes.
1877    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
1878    fn create_vector_index_on_store(
1879        store: &LpgStore,
1880        label: &str,
1881        property: &str,
1882        dimensions: Option<usize>,
1883        metric: Option<&str>,
1884    ) -> Result<()> {
1885        use grafeo_common::types::{PropertyKey, Value};
1886        use grafeo_common::utils::error::Error;
1887        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1888
1889        let metric = match metric {
1890            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1891                Error::Internal(format!(
1892                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1893                ))
1894            })?,
1895            None => DistanceMetric::Cosine,
1896        };
1897
1898        let prop_key = PropertyKey::new(property);
1899        let mut found_dims: Option<usize> = dimensions;
1900        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1901
1902        for node in store.nodes_with_label(label) {
1903            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1904                if let Some(expected) = found_dims {
1905                    if v.len() != expected {
1906                        return Err(Error::Internal(format!(
1907                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1908                            v.len(),
1909                            node.id.0
1910                        )));
1911                    }
1912                } else {
1913                    found_dims = Some(v.len());
1914                }
1915                vectors.push((node.id, v.to_vec()));
1916            }
1917        }
1918
1919        let Some(dims) = found_dims else {
1920            return Err(Error::Internal(format!(
1921                "No vector properties found on :{label}({property}) and no dimensions specified"
1922            )));
1923        };
1924
1925        let config = HnswConfig::new(dims, metric);
1926        let index = HnswIndex::with_capacity(config, vectors.len());
1927        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1928        for (node_id, vec) in &vectors {
1929            index.insert(*node_id, vec, &accessor);
1930        }
1931
1932        store.add_vector_index(label, property, Arc::new(index));
1933        Ok(())
1934    }
1935
1936    /// Stub for when vector-index feature is not enabled.
1937    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
1938    fn create_vector_index_on_store(
1939        _store: &LpgStore,
1940        _label: &str,
1941        _property: &str,
1942        _dimensions: Option<usize>,
1943        _metric: Option<&str>,
1944    ) -> Result<()> {
1945        Err(grafeo_common::utils::error::Error::Internal(
1946            "Vector index support requires the 'vector-index' feature".to_string(),
1947        ))
1948    }
1949
1950    /// Creates a text index on the store by scanning existing nodes.
1951    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
1952    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1953        use grafeo_common::types::{PropertyKey, Value};
1954        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1955
1956        let mut index = InvertedIndex::new(BM25Config::default());
1957        let prop_key = PropertyKey::new(property);
1958
1959        let nodes = store.nodes_by_label(label);
1960        for node_id in nodes {
1961            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1962                index.insert(node_id, text.as_str());
1963            }
1964        }
1965
1966        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1967        Ok(())
1968    }
1969
1970    /// Stub for when text-index feature is not enabled.
1971    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
1972    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1973        Err(grafeo_common::utils::error::Error::Internal(
1974            "Text index support requires the 'text-index' feature".to_string(),
1975        ))
1976    }
1977
1978    /// Returns a table of all indexes from the catalog.
1979    fn execute_show_indexes(&self) -> Result<QueryResult> {
1980        let indexes = self.catalog.all_indexes();
1981        let columns = vec![
1982            "name".to_string(),
1983            "type".to_string(),
1984            "label".to_string(),
1985            "property".to_string(),
1986        ];
1987        let rows: Vec<Vec<Value>> = indexes
1988            .into_iter()
1989            .map(|def| {
1990                let label_name = self
1991                    .catalog
1992                    .get_label_name(def.label)
1993                    .unwrap_or_else(|| "?".into());
1994                let prop_name = self
1995                    .catalog
1996                    .get_property_key_name(def.property_key)
1997                    .unwrap_or_else(|| "?".into());
1998                vec![
1999                    Value::from(def.name),
2000                    Value::from(format!("{:?}", def.index_type)),
2001                    Value::from(&*label_name),
2002                    Value::from(&*prop_name),
2003                ]
2004            })
2005            .collect();
2006        Ok(QueryResult {
2007            columns,
2008            column_types: Vec::new(),
2009            rows,
2010            ..QueryResult::empty()
2011        })
2012    }
2013
2014    /// Returns a table of all constraints (currently metadata-only).
2015    fn execute_show_constraints(&self) -> Result<QueryResult> {
2016        // Constraints are tracked in WAL but not yet in a queryable catalog.
2017        // Return an empty table with the expected schema.
2018        Ok(QueryResult {
2019            columns: vec![
2020                "name".to_string(),
2021                "type".to_string(),
2022                "label".to_string(),
2023                "properties".to_string(),
2024            ],
2025            column_types: Vec::new(),
2026            rows: Vec::new(),
2027            ..QueryResult::empty()
2028        })
2029    }
2030
2031    /// Returns a table of all registered node types in the current schema.
2032    fn execute_show_node_types(&self) -> Result<QueryResult> {
2033        let columns = vec![
2034            "name".to_string(),
2035            "properties".to_string(),
2036            "constraints".to_string(),
2037            "parents".to_string(),
2038        ];
2039        let schema = self.current_schema.lock().clone();
2040        let all_names = self.catalog.all_node_type_names();
2041        let type_names: Vec<String> = match &schema {
2042            Some(s) => {
2043                let prefix = format!("{s}/");
2044                all_names
2045                    .into_iter()
2046                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2047                    .collect()
2048            }
2049            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2050        };
2051        let rows: Vec<Vec<Value>> = type_names
2052            .into_iter()
2053            .filter_map(|name| {
2054                let lookup = match &schema {
2055                    Some(s) => format!("{s}/{name}"),
2056                    None => name.clone(),
2057                };
2058                let def = self.catalog.get_node_type(&lookup)?;
2059                let props: Vec<String> = def
2060                    .properties
2061                    .iter()
2062                    .map(|p| {
2063                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2064                        format!("{} {}{}", p.name, p.data_type, nullable)
2065                    })
2066                    .collect();
2067                let constraints: Vec<String> =
2068                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2069                let parents = def.parent_types.join(", ");
2070                Some(vec![
2071                    Value::from(name),
2072                    Value::from(props.join(", ")),
2073                    Value::from(constraints.join(", ")),
2074                    Value::from(parents),
2075                ])
2076            })
2077            .collect();
2078        Ok(QueryResult {
2079            columns,
2080            column_types: Vec::new(),
2081            rows,
2082            ..QueryResult::empty()
2083        })
2084    }
2085
2086    /// Returns a table of all registered edge types in the current schema.
2087    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2088        let columns = vec![
2089            "name".to_string(),
2090            "properties".to_string(),
2091            "source_types".to_string(),
2092            "target_types".to_string(),
2093        ];
2094        let schema = self.current_schema.lock().clone();
2095        let all_names = self.catalog.all_edge_type_names();
2096        let type_names: Vec<String> = match &schema {
2097            Some(s) => {
2098                let prefix = format!("{s}/");
2099                all_names
2100                    .into_iter()
2101                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2102                    .collect()
2103            }
2104            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2105        };
2106        let rows: Vec<Vec<Value>> = type_names
2107            .into_iter()
2108            .filter_map(|name| {
2109                let lookup = match &schema {
2110                    Some(s) => format!("{s}/{name}"),
2111                    None => name.clone(),
2112                };
2113                let def = self.catalog.get_edge_type_def(&lookup)?;
2114                let props: Vec<String> = def
2115                    .properties
2116                    .iter()
2117                    .map(|p| {
2118                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2119                        format!("{} {}{}", p.name, p.data_type, nullable)
2120                    })
2121                    .collect();
2122                let src = def.source_node_types.join(", ");
2123                let tgt = def.target_node_types.join(", ");
2124                Some(vec![
2125                    Value::from(name),
2126                    Value::from(props.join(", ")),
2127                    Value::from(src),
2128                    Value::from(tgt),
2129                ])
2130            })
2131            .collect();
2132        Ok(QueryResult {
2133            columns,
2134            column_types: Vec::new(),
2135            rows,
2136            ..QueryResult::empty()
2137        })
2138    }
2139
2140    /// Returns a table of all registered graph types in the current schema.
2141    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2142        let columns = vec![
2143            "name".to_string(),
2144            "open".to_string(),
2145            "node_types".to_string(),
2146            "edge_types".to_string(),
2147        ];
2148        let schema = self.current_schema.lock().clone();
2149        let all_names = self.catalog.all_graph_type_names();
2150        let type_names: Vec<String> = match &schema {
2151            Some(s) => {
2152                let prefix = format!("{s}/");
2153                all_names
2154                    .into_iter()
2155                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2156                    .collect()
2157            }
2158            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2159        };
2160        let rows: Vec<Vec<Value>> = type_names
2161            .into_iter()
2162            .filter_map(|name| {
2163                let lookup = match &schema {
2164                    Some(s) => format!("{s}/{name}"),
2165                    None => name.clone(),
2166                };
2167                let def = self.catalog.get_graph_type_def(&lookup)?;
2168                // Strip schema prefix from allowed type names for display
2169                let strip = |n: &String| -> String {
2170                    match &schema {
2171                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2172                        None => n.clone(),
2173                    }
2174                };
2175                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2176                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2177                Some(vec![
2178                    Value::from(name),
2179                    Value::from(def.open),
2180                    Value::from(node_types.join(", ")),
2181                    Value::from(edge_types.join(", ")),
2182                ])
2183            })
2184            .collect();
2185        Ok(QueryResult {
2186            columns,
2187            column_types: Vec::new(),
2188            rows,
2189            ..QueryResult::empty()
2190        })
2191    }
2192
2193    /// Returns the list of named graphs visible in the current schema context.
2194    ///
2195    /// When a session schema is set, only graphs belonging to that schema are
2196    /// shown (their compound prefix is stripped). When no schema is set, graphs
2197    /// without a schema prefix are shown (the default schema).
2198    #[cfg(feature = "lpg")]
2199    fn execute_show_graphs(&self) -> Result<QueryResult> {
2200        let schema = self.current_schema.lock().clone();
2201        let all_names = self.store.graph_names();
2202
2203        let mut names: Vec<String> = match &schema {
2204            Some(s) => {
2205                let prefix = format!("{s}/");
2206                all_names
2207                    .into_iter()
2208                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2209                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2210                    .collect()
2211            }
2212            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2213        };
2214        names.sort();
2215
2216        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2217        Ok(QueryResult {
2218            columns: vec!["name".to_string()],
2219            column_types: Vec::new(),
2220            rows,
2221            ..QueryResult::empty()
2222        })
2223    }
2224
2225    /// Returns the list of all schema namespaces.
2226    fn execute_show_schemas(&self) -> Result<QueryResult> {
2227        let mut names = self.catalog.schema_names();
2228        names.sort();
2229        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2230        Ok(QueryResult {
2231            columns: vec!["name".to_string()],
2232            column_types: Vec::new(),
2233            rows,
2234            ..QueryResult::empty()
2235        })
2236    }
2237
2238    /// Returns detailed info for a specific graph type.
2239    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2240        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2241
2242        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2243            Error::Query(QueryError::new(
2244                QueryErrorKind::Semantic,
2245                format!("Graph type '{name}' not found"),
2246            ))
2247        })?;
2248
2249        let columns = vec![
2250            "name".to_string(),
2251            "open".to_string(),
2252            "node_types".to_string(),
2253            "edge_types".to_string(),
2254        ];
2255        let rows = vec![vec![
2256            Value::from(def.name),
2257            Value::from(def.open),
2258            Value::from(def.allowed_node_types.join(", ")),
2259            Value::from(def.allowed_edge_types.join(", ")),
2260        ]];
2261        Ok(QueryResult {
2262            columns,
2263            column_types: Vec::new(),
2264            rows,
2265            ..QueryResult::empty()
2266        })
2267    }
2268
2269    /// Returns the graph type bound to the current graph.
2270    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2271        let graph_name = self
2272            .current_graph()
2273            .unwrap_or_else(|| "default".to_string());
2274        let columns = vec![
2275            "graph".to_string(),
2276            "graph_type".to_string(),
2277            "open".to_string(),
2278            "node_types".to_string(),
2279            "edge_types".to_string(),
2280        ];
2281
2282        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2283            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2284        {
2285            let rows = vec![vec![
2286                Value::from(graph_name),
2287                Value::from(type_name),
2288                Value::from(def.open),
2289                Value::from(def.allowed_node_types.join(", ")),
2290                Value::from(def.allowed_edge_types.join(", ")),
2291            ]];
2292            return Ok(QueryResult {
2293                columns,
2294                column_types: Vec::new(),
2295                rows,
2296                ..QueryResult::empty()
2297            });
2298        }
2299
2300        // No graph type binding found
2301        Ok(QueryResult {
2302            columns,
2303            column_types: Vec::new(),
2304            rows: vec![vec![
2305                Value::from(graph_name),
2306                Value::Null,
2307                Value::Null,
2308                Value::Null,
2309                Value::Null,
2310            ]],
2311            ..QueryResult::empty()
2312        })
2313    }
2314
2315    /// Executes a GQL query.
2316    ///
2317    /// # Errors
2318    ///
2319    /// Returns an error if the query fails to parse or execute.
2320    ///
2321    /// # Examples
2322    ///
2323    /// ```no_run
2324    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2325    /// use grafeo_engine::GrafeoDB;
2326    ///
2327    /// let db = GrafeoDB::new_in_memory();
2328    /// let session = db.session();
2329    ///
2330    /// // Create a node
2331    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2332    ///
2333    /// // Query nodes
2334    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2335    /// for row in result.rows() {
2336    ///     println!("{:?}", row);
2337    /// }
2338    /// # Ok(())
2339    /// # }
2340    /// ```
2341    #[cfg(feature = "gql")]
2342    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2343        self.require_lpg("GQL")?;
2344
2345        use crate::query::{
2346            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2347            processor::QueryLanguage, translators::gql,
2348        };
2349
2350        let _span = grafeo_info_span!(
2351            "grafeo::session::execute",
2352            language = "gql",
2353            query_len = query.len(),
2354        );
2355
2356        #[cfg(not(target_arch = "wasm32"))]
2357        let start_time = std::time::Instant::now();
2358
2359        // Parse and translate, checking for session/schema commands first
2360        let translation = gql::translate_full(query)?;
2361        let logical_plan = match translation {
2362            gql::GqlTranslationResult::SessionCommand(cmd) => {
2363                return self.execute_session_command(cmd);
2364            }
2365            #[cfg(feature = "lpg")]
2366            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2367                // All DDL is a write operation
2368                if *self.read_only_tx.lock() {
2369                    return Err(grafeo_common::utils::error::Error::Transaction(
2370                        grafeo_common::utils::error::TransactionError::ReadOnly,
2371                    ));
2372                }
2373                return self.execute_schema_command(cmd);
2374            }
2375            gql::GqlTranslationResult::Plan(plan) => {
2376                // Block mutations in read-only transactions
2377                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2378                    return Err(grafeo_common::utils::error::Error::Transaction(
2379                        grafeo_common::utils::error::TransactionError::ReadOnly,
2380                    ));
2381                }
2382                plan
2383            }
2384            #[cfg(not(feature = "lpg"))]
2385            gql::GqlTranslationResult::SchemaCommand(_) => {
2386                return Err(grafeo_common::utils::error::Error::Internal(
2387                    "Schema commands require the `lpg` feature".to_string(),
2388                ));
2389            }
2390        };
2391
2392        // Create cache key for this query
2393        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2394
2395        // Try to get cached optimized plan, or use the plan we just translated
2396        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2397            cached_plan
2398        } else {
2399            // Semantic validation
2400            let mut binder = Binder::new();
2401            let _binding_context = binder.bind(&logical_plan)?;
2402
2403            // Optimize the plan
2404            let active = self.active_store();
2405            let optimizer = Optimizer::from_graph_store(&*active);
2406            let plan = optimizer.optimize(logical_plan)?;
2407
2408            // Cache the optimized plan for future use
2409            self.query_cache.put_optimized(cache_key, plan.clone());
2410
2411            plan
2412        };
2413
2414        // Resolve the active store for query execution
2415        let active = self.active_store();
2416
2417        // EXPLAIN: annotate pushdown hints and return the plan tree
2418        if optimized_plan.explain {
2419            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2420            let mut plan = optimized_plan;
2421            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2422            return Ok(explain_result(&plan));
2423        }
2424
2425        // PROFILE: execute with per-operator instrumentation
2426        if optimized_plan.profile {
2427            let has_mutations = optimized_plan.root.has_mutations();
2428            return self.with_auto_commit(has_mutations, || {
2429                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2430                let planner = self.create_planner_for_store(
2431                    Arc::clone(&active),
2432                    viewing_epoch,
2433                    transaction_id,
2434                );
2435                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2436
2437                let executor = Executor::with_columns(physical_plan.columns.clone())
2438                    .with_deadline(self.query_deadline());
2439                let _result = executor.execute(physical_plan.operator.as_mut())?;
2440
2441                let total_time_ms;
2442                #[cfg(not(target_arch = "wasm32"))]
2443                {
2444                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2445                }
2446                #[cfg(target_arch = "wasm32")]
2447                {
2448                    total_time_ms = 0.0;
2449                }
2450
2451                let profile_tree = crate::query::profile::build_profile_tree(
2452                    &optimized_plan.root,
2453                    &mut entries.into_iter(),
2454                );
2455                Ok(crate::query::profile::profile_result(
2456                    &profile_tree,
2457                    total_time_ms,
2458                ))
2459            });
2460        }
2461
2462        let has_mutations = optimized_plan.root.has_mutations();
2463
2464        let result = self.with_auto_commit(has_mutations, || {
2465            // Get transaction context for MVCC visibility
2466            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2467
2468            // Convert to physical plan with transaction context
2469            // (Physical planning cannot be cached as it depends on transaction state)
2470            // Safe to use read-only fast path when: this query has no mutations AND
2471            // there is no active transaction that may have prior uncommitted writes.
2472            let has_active_tx = self.current_transaction.lock().is_some();
2473            let read_only = !has_mutations && !has_active_tx;
2474            let planner = self.create_planner_for_store_with_read_only(
2475                Arc::clone(&active),
2476                viewing_epoch,
2477                transaction_id,
2478                read_only,
2479            );
2480            let mut physical_plan = planner.plan(&optimized_plan)?;
2481
2482            // Execute the plan
2483            let executor = Executor::with_columns(physical_plan.columns.clone())
2484                .with_deadline(self.query_deadline());
2485            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2486
2487            // Add execution metrics
2488            let rows_scanned = result.rows.len() as u64;
2489            #[cfg(not(target_arch = "wasm32"))]
2490            {
2491                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2492                result.execution_time_ms = Some(elapsed_ms);
2493            }
2494            result.rows_scanned = Some(rows_scanned);
2495
2496            Ok(result)
2497        });
2498
2499        // Record metrics for this query execution.
2500        #[cfg(feature = "metrics")]
2501        {
2502            #[cfg(not(target_arch = "wasm32"))]
2503            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2504            #[cfg(target_arch = "wasm32")]
2505            let elapsed_ms = None;
2506            self.record_query_metrics("gql", elapsed_ms, &result);
2507        }
2508
2509        result
2510    }
2511
2512    /// Executes a GQL query with visibility at the specified epoch.
2513    ///
2514    /// This enables time-travel queries: the query sees the database
2515    /// as it existed at the given epoch.
2516    ///
2517    /// # Errors
2518    ///
2519    /// Returns an error if parsing or execution fails.
2520    #[cfg(feature = "gql")]
2521    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2522        let previous = self.viewing_epoch_override.lock().replace(epoch);
2523        let result = self.execute(query);
2524        *self.viewing_epoch_override.lock() = previous;
2525        result
2526    }
2527
2528    /// Executes a GQL query at a specific epoch with optional parameters.
2529    ///
2530    /// Combines epoch-based time travel with parameterized queries.
2531    ///
2532    /// # Errors
2533    ///
2534    /// Returns an error if parsing or execution fails.
2535    #[cfg(feature = "gql")]
2536    pub fn execute_at_epoch_with_params(
2537        &self,
2538        query: &str,
2539        epoch: EpochId,
2540        params: Option<std::collections::HashMap<String, Value>>,
2541    ) -> Result<QueryResult> {
2542        let previous = self.viewing_epoch_override.lock().replace(epoch);
2543        let result = if let Some(p) = params {
2544            self.execute_with_params(query, p)
2545        } else {
2546            self.execute(query)
2547        };
2548        *self.viewing_epoch_override.lock() = previous;
2549        result
2550    }
2551
2552    /// Executes a GQL query with parameters.
2553    ///
2554    /// # Errors
2555    ///
2556    /// Returns an error if the query fails to parse or execute.
2557    #[cfg(feature = "gql")]
2558    pub fn execute_with_params(
2559        &self,
2560        query: &str,
2561        params: std::collections::HashMap<String, Value>,
2562    ) -> Result<QueryResult> {
2563        self.require_lpg("GQL")?;
2564
2565        use crate::query::processor::{QueryLanguage, QueryProcessor};
2566
2567        let has_mutations = Self::query_looks_like_mutation(query);
2568        let active = self.active_store();
2569
2570        self.with_auto_commit(has_mutations, || {
2571            // Get transaction context for MVCC visibility
2572            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2573
2574            // Create processor with transaction context
2575            let processor = QueryProcessor::for_stores_with_transaction(
2576                Arc::clone(&active),
2577                self.active_write_store(),
2578                Arc::clone(&self.transaction_manager),
2579            )?;
2580
2581            // Apply transaction context if in a transaction
2582            let processor = if let Some(transaction_id) = transaction_id {
2583                processor.with_transaction_context(viewing_epoch, transaction_id)
2584            } else {
2585                processor
2586            };
2587
2588            processor.process(query, QueryLanguage::Gql, Some(&params))
2589        })
2590    }
2591
2592    /// Executes a GQL query with parameters.
2593    ///
2594    /// # Errors
2595    ///
2596    /// Returns an error if no query language is enabled.
2597    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2598    pub fn execute_with_params(
2599        &self,
2600        _query: &str,
2601        _params: std::collections::HashMap<String, Value>,
2602    ) -> Result<QueryResult> {
2603        Err(grafeo_common::utils::error::Error::Internal(
2604            "No query language enabled".to_string(),
2605        ))
2606    }
2607
2608    /// Executes a GQL query.
2609    ///
2610    /// # Errors
2611    ///
2612    /// Returns an error if no query language is enabled.
2613    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2614    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2615        Err(grafeo_common::utils::error::Error::Internal(
2616            "No query language enabled".to_string(),
2617        ))
2618    }
2619
2620    /// Executes a Cypher query.
2621    ///
2622    /// # Errors
2623    ///
2624    /// Returns an error if the query fails to parse or execute.
2625    #[cfg(feature = "cypher")]
2626    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2627        use crate::query::{
2628            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2629            processor::QueryLanguage, translators::cypher,
2630        };
2631
2632        // Handle schema DDL and SHOW commands before the normal query path
2633        let translation = cypher::translate_full(query)?;
2634        match translation {
2635            #[cfg(feature = "lpg")]
2636            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2637                use grafeo_common::utils::error::{
2638                    Error as GrafeoError, QueryError, QueryErrorKind,
2639                };
2640                if *self.read_only_tx.lock() {
2641                    return Err(GrafeoError::Query(QueryError::new(
2642                        QueryErrorKind::Semantic,
2643                        "Cannot execute schema DDL in a read-only transaction",
2644                    )));
2645                }
2646                return self.execute_schema_command(cmd);
2647            }
2648            #[cfg(not(feature = "lpg"))]
2649            cypher::CypherTranslationResult::SchemaCommand(_) => {
2650                return Err(grafeo_common::utils::error::Error::Internal(
2651                    "Schema DDL requires the `lpg` feature".to_string(),
2652                ));
2653            }
2654            cypher::CypherTranslationResult::ShowIndexes => {
2655                return self.execute_show_indexes();
2656            }
2657            cypher::CypherTranslationResult::ShowConstraints => {
2658                return self.execute_show_constraints();
2659            }
2660            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2661                return self.execute_show_current_graph_type();
2662            }
2663            cypher::CypherTranslationResult::Plan(_) => {
2664                // Fall through to normal execution below
2665            }
2666        }
2667
2668        #[cfg(not(target_arch = "wasm32"))]
2669        let start_time = std::time::Instant::now();
2670
2671        // Create cache key for this query
2672        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2673
2674        // Try to get cached optimized plan
2675        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2676            cached_plan
2677        } else {
2678            // Parse and translate the query to a logical plan
2679            let logical_plan = cypher::translate(query)?;
2680
2681            // Semantic validation
2682            let mut binder = Binder::new();
2683            let _binding_context = binder.bind(&logical_plan)?;
2684
2685            // Optimize the plan
2686            let active = self.active_store();
2687            let optimizer = Optimizer::from_graph_store(&*active);
2688            let plan = optimizer.optimize(logical_plan)?;
2689
2690            // Cache the optimized plan
2691            self.query_cache.put_optimized(cache_key, plan.clone());
2692
2693            plan
2694        };
2695
2696        // Resolve the active store for query execution
2697        let active = self.active_store();
2698
2699        // EXPLAIN
2700        if optimized_plan.explain {
2701            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2702            let mut plan = optimized_plan;
2703            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2704            return Ok(explain_result(&plan));
2705        }
2706
2707        // PROFILE
2708        if optimized_plan.profile {
2709            let has_mutations = optimized_plan.root.has_mutations();
2710            return self.with_auto_commit(has_mutations, || {
2711                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2712                let planner = self.create_planner_for_store(
2713                    Arc::clone(&active),
2714                    viewing_epoch,
2715                    transaction_id,
2716                );
2717                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2718
2719                let executor = Executor::with_columns(physical_plan.columns.clone())
2720                    .with_deadline(self.query_deadline());
2721                let _result = executor.execute(physical_plan.operator.as_mut())?;
2722
2723                let total_time_ms;
2724                #[cfg(not(target_arch = "wasm32"))]
2725                {
2726                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2727                }
2728                #[cfg(target_arch = "wasm32")]
2729                {
2730                    total_time_ms = 0.0;
2731                }
2732
2733                let profile_tree = crate::query::profile::build_profile_tree(
2734                    &optimized_plan.root,
2735                    &mut entries.into_iter(),
2736                );
2737                Ok(crate::query::profile::profile_result(
2738                    &profile_tree,
2739                    total_time_ms,
2740                ))
2741            });
2742        }
2743
2744        let has_mutations = optimized_plan.root.has_mutations();
2745
2746        let result = self.with_auto_commit(has_mutations, || {
2747            // Get transaction context for MVCC visibility
2748            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2749
2750            // Convert to physical plan with transaction context
2751            let planner =
2752                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2753            let mut physical_plan = planner.plan(&optimized_plan)?;
2754
2755            // Execute the plan
2756            let executor = Executor::with_columns(physical_plan.columns.clone())
2757                .with_deadline(self.query_deadline());
2758            executor.execute(physical_plan.operator.as_mut())
2759        });
2760
2761        #[cfg(feature = "metrics")]
2762        {
2763            #[cfg(not(target_arch = "wasm32"))]
2764            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2765            #[cfg(target_arch = "wasm32")]
2766            let elapsed_ms = None;
2767            self.record_query_metrics("cypher", elapsed_ms, &result);
2768        }
2769
2770        result
2771    }
2772
2773    /// Executes a Gremlin query.
2774    ///
2775    /// # Errors
2776    ///
2777    /// Returns an error if the query fails to parse or execute.
2778    ///
2779    /// # Examples
2780    ///
2781    /// ```no_run
2782    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2783    /// use grafeo_engine::GrafeoDB;
2784    ///
2785    /// let db = GrafeoDB::new_in_memory();
2786    /// let session = db.session();
2787    ///
2788    /// // Create some nodes first
2789    /// session.create_node(&["Person"]);
2790    ///
2791    /// // Query using Gremlin
2792    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2793    /// # Ok(())
2794    /// # }
2795    /// ```
2796    #[cfg(feature = "gremlin")]
2797    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2798        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2799
2800        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2801        let start_time = Instant::now();
2802
2803        // Parse and translate the query to a logical plan
2804        let logical_plan = gremlin::translate(query)?;
2805
2806        // Semantic validation
2807        let mut binder = Binder::new();
2808        let _binding_context = binder.bind(&logical_plan)?;
2809
2810        // Optimize the plan
2811        let active = self.active_store();
2812        let optimizer = Optimizer::from_graph_store(&*active);
2813        let optimized_plan = optimizer.optimize(logical_plan)?;
2814
2815        let has_mutations = optimized_plan.root.has_mutations();
2816
2817        let result = self.with_auto_commit(has_mutations, || {
2818            // Get transaction context for MVCC visibility
2819            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2820
2821            // Convert to physical plan with transaction context
2822            let planner =
2823                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2824            let mut physical_plan = planner.plan(&optimized_plan)?;
2825
2826            // Execute the plan
2827            let executor = Executor::with_columns(physical_plan.columns.clone())
2828                .with_deadline(self.query_deadline());
2829            executor.execute(physical_plan.operator.as_mut())
2830        });
2831
2832        #[cfg(feature = "metrics")]
2833        {
2834            #[cfg(not(target_arch = "wasm32"))]
2835            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2836            #[cfg(target_arch = "wasm32")]
2837            let elapsed_ms = None;
2838            self.record_query_metrics("gremlin", elapsed_ms, &result);
2839        }
2840
2841        result
2842    }
2843
2844    /// Executes a Gremlin query with parameters.
2845    ///
2846    /// # Errors
2847    ///
2848    /// Returns an error if the query fails to parse or execute.
2849    #[cfg(feature = "gremlin")]
2850    pub fn execute_gremlin_with_params(
2851        &self,
2852        query: &str,
2853        params: std::collections::HashMap<String, Value>,
2854    ) -> Result<QueryResult> {
2855        use crate::query::processor::{QueryLanguage, QueryProcessor};
2856
2857        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2858        let start_time = Instant::now();
2859
2860        let has_mutations = Self::query_looks_like_mutation(query);
2861        let active = self.active_store();
2862
2863        let result = self.with_auto_commit(has_mutations, || {
2864            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2865            let processor = QueryProcessor::for_stores_with_transaction(
2866                Arc::clone(&active),
2867                self.active_write_store(),
2868                Arc::clone(&self.transaction_manager),
2869            )?;
2870            let processor = if let Some(transaction_id) = transaction_id {
2871                processor.with_transaction_context(viewing_epoch, transaction_id)
2872            } else {
2873                processor
2874            };
2875            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2876        });
2877
2878        #[cfg(feature = "metrics")]
2879        {
2880            #[cfg(not(target_arch = "wasm32"))]
2881            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2882            #[cfg(target_arch = "wasm32")]
2883            let elapsed_ms = None;
2884            self.record_query_metrics("gremlin", elapsed_ms, &result);
2885        }
2886
2887        result
2888    }
2889
2890    /// Executes a GraphQL query against the LPG store.
2891    ///
2892    /// # Errors
2893    ///
2894    /// Returns an error if the query fails to parse or execute.
2895    ///
2896    /// # Examples
2897    ///
2898    /// ```no_run
2899    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2900    /// use grafeo_engine::GrafeoDB;
2901    ///
2902    /// let db = GrafeoDB::new_in_memory();
2903    /// let session = db.session();
2904    ///
2905    /// // Create some nodes first
2906    /// session.create_node(&["User"]);
2907    ///
2908    /// // Query using GraphQL
2909    /// let result = session.execute_graphql("query { user { id name } }")?;
2910    /// # Ok(())
2911    /// # }
2912    /// ```
2913    #[cfg(feature = "graphql")]
2914    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2915        use crate::query::{
2916            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2917            translators::graphql,
2918        };
2919
2920        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2921        let start_time = Instant::now();
2922
2923        let mut logical_plan = graphql::translate(query)?;
2924
2925        // Substitute default parameter values from variable declarations
2926        if !logical_plan.default_params.is_empty() {
2927            let defaults = logical_plan.default_params.clone();
2928            substitute_params(&mut logical_plan, &defaults)?;
2929        }
2930
2931        let mut binder = Binder::new();
2932        let _binding_context = binder.bind(&logical_plan)?;
2933
2934        let active = self.active_store();
2935        let optimizer = Optimizer::from_graph_store(&*active);
2936        let optimized_plan = optimizer.optimize(logical_plan)?;
2937        let has_mutations = optimized_plan.root.has_mutations();
2938
2939        let result = self.with_auto_commit(has_mutations, || {
2940            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2941            let planner =
2942                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2943            let mut physical_plan = planner.plan(&optimized_plan)?;
2944            let executor = Executor::with_columns(physical_plan.columns.clone())
2945                .with_deadline(self.query_deadline());
2946            executor.execute(physical_plan.operator.as_mut())
2947        });
2948
2949        #[cfg(feature = "metrics")]
2950        {
2951            #[cfg(not(target_arch = "wasm32"))]
2952            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2953            #[cfg(target_arch = "wasm32")]
2954            let elapsed_ms = None;
2955            self.record_query_metrics("graphql", elapsed_ms, &result);
2956        }
2957
2958        result
2959    }
2960
2961    /// Executes a GraphQL query with parameters.
2962    ///
2963    /// # Errors
2964    ///
2965    /// Returns an error if the query fails to parse or execute.
2966    #[cfg(feature = "graphql")]
2967    pub fn execute_graphql_with_params(
2968        &self,
2969        query: &str,
2970        params: std::collections::HashMap<String, Value>,
2971    ) -> Result<QueryResult> {
2972        use crate::query::processor::{QueryLanguage, QueryProcessor};
2973
2974        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2975        let start_time = Instant::now();
2976
2977        let has_mutations = Self::query_looks_like_mutation(query);
2978        let active = self.active_store();
2979
2980        let result = self.with_auto_commit(has_mutations, || {
2981            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2982            let processor = QueryProcessor::for_stores_with_transaction(
2983                Arc::clone(&active),
2984                self.active_write_store(),
2985                Arc::clone(&self.transaction_manager),
2986            )?;
2987            let processor = if let Some(transaction_id) = transaction_id {
2988                processor.with_transaction_context(viewing_epoch, transaction_id)
2989            } else {
2990                processor
2991            };
2992            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2993        });
2994
2995        #[cfg(feature = "metrics")]
2996        {
2997            #[cfg(not(target_arch = "wasm32"))]
2998            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2999            #[cfg(target_arch = "wasm32")]
3000            let elapsed_ms = None;
3001            self.record_query_metrics("graphql", elapsed_ms, &result);
3002        }
3003
3004        result
3005    }
3006
3007    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3008    ///
3009    /// # Errors
3010    ///
3011    /// Returns an error if the query fails to parse or execute.
3012    ///
3013    /// # Examples
3014    ///
3015    /// ```no_run
3016    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3017    /// use grafeo_engine::GrafeoDB;
3018    ///
3019    /// let db = GrafeoDB::new_in_memory();
3020    /// let session = db.session();
3021    ///
3022    /// let result = session.execute_sql(
3023    ///     "SELECT * FROM GRAPH_TABLE (
3024    ///         MATCH (n:Person)
3025    ///         COLUMNS (n.name AS name)
3026    ///     )"
3027    /// )?;
3028    /// # Ok(())
3029    /// # }
3030    /// ```
3031    #[cfg(feature = "sql-pgq")]
3032    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3033        use crate::query::{
3034            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3035            processor::QueryLanguage, translators::sql_pgq,
3036        };
3037
3038        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3039        let start_time = Instant::now();
3040
3041        // Parse and translate (always needed to check for DDL)
3042        let logical_plan = sql_pgq::translate(query)?;
3043
3044        // Handle DDL statements directly (they don't go through the query pipeline)
3045        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3046            return Ok(QueryResult {
3047                columns: vec!["status".into()],
3048                column_types: vec![grafeo_common::types::LogicalType::String],
3049                rows: vec![vec![Value::from(format!(
3050                    "Property graph '{}' created ({} node tables, {} edge tables)",
3051                    cpg.name,
3052                    cpg.node_tables.len(),
3053                    cpg.edge_tables.len()
3054                ))]],
3055                execution_time_ms: None,
3056                rows_scanned: None,
3057                status_message: None,
3058                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3059            });
3060        }
3061
3062        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3063
3064        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3065            cached_plan
3066        } else {
3067            let mut binder = Binder::new();
3068            let _binding_context = binder.bind(&logical_plan)?;
3069            let active = self.active_store();
3070            let optimizer = Optimizer::from_graph_store(&*active);
3071            let plan = optimizer.optimize(logical_plan)?;
3072            self.query_cache.put_optimized(cache_key, plan.clone());
3073            plan
3074        };
3075
3076        let active = self.active_store();
3077        let has_mutations = optimized_plan.root.has_mutations();
3078
3079        let result = self.with_auto_commit(has_mutations, || {
3080            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3081            let planner =
3082                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3083            let mut physical_plan = planner.plan(&optimized_plan)?;
3084            let executor = Executor::with_columns(physical_plan.columns.clone())
3085                .with_deadline(self.query_deadline());
3086            executor.execute(physical_plan.operator.as_mut())
3087        });
3088
3089        #[cfg(feature = "metrics")]
3090        {
3091            #[cfg(not(target_arch = "wasm32"))]
3092            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3093            #[cfg(target_arch = "wasm32")]
3094            let elapsed_ms = None;
3095            self.record_query_metrics("sql", elapsed_ms, &result);
3096        }
3097
3098        result
3099    }
3100
3101    /// Executes a SQL/PGQ query with parameters.
3102    ///
3103    /// # Errors
3104    ///
3105    /// Returns an error if the query fails to parse or execute.
3106    #[cfg(feature = "sql-pgq")]
3107    pub fn execute_sql_with_params(
3108        &self,
3109        query: &str,
3110        params: std::collections::HashMap<String, Value>,
3111    ) -> Result<QueryResult> {
3112        use crate::query::processor::{QueryLanguage, QueryProcessor};
3113
3114        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3115        let start_time = Instant::now();
3116
3117        let has_mutations = Self::query_looks_like_mutation(query);
3118        let active = self.active_store();
3119
3120        let result = self.with_auto_commit(has_mutations, || {
3121            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3122            let processor = QueryProcessor::for_stores_with_transaction(
3123                Arc::clone(&active),
3124                self.active_write_store(),
3125                Arc::clone(&self.transaction_manager),
3126            )?;
3127            let processor = if let Some(transaction_id) = transaction_id {
3128                processor.with_transaction_context(viewing_epoch, transaction_id)
3129            } else {
3130                processor
3131            };
3132            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3133        });
3134
3135        #[cfg(feature = "metrics")]
3136        {
3137            #[cfg(not(target_arch = "wasm32"))]
3138            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3139            #[cfg(target_arch = "wasm32")]
3140            let elapsed_ms = None;
3141            self.record_query_metrics("sql", elapsed_ms, &result);
3142        }
3143
3144        result
3145    }
3146
3147    /// Executes a query in the specified language by name.
3148    ///
3149    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3150    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3151    ///
3152    /// # Errors
3153    ///
3154    /// Returns an error if the language is unknown/disabled or the query fails.
3155    pub fn execute_language(
3156        &self,
3157        query: &str,
3158        language: &str,
3159        params: Option<std::collections::HashMap<String, Value>>,
3160    ) -> Result<QueryResult> {
3161        let _span = grafeo_info_span!(
3162            "grafeo::session::execute",
3163            language,
3164            query_len = query.len(),
3165        );
3166        match language {
3167            "gql" => {
3168                if let Some(p) = params {
3169                    self.execute_with_params(query, p)
3170                } else {
3171                    self.execute(query)
3172                }
3173            }
3174            #[cfg(feature = "cypher")]
3175            "cypher" => {
3176                if let Some(p) = params {
3177                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3178
3179                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3180                    let start_time = Instant::now();
3181
3182                    let has_mutations = Self::query_looks_like_mutation(query);
3183                    let active = self.active_store();
3184                    let result = self.with_auto_commit(has_mutations, || {
3185                        let processor = QueryProcessor::for_stores_with_transaction(
3186                            Arc::clone(&active),
3187                            self.active_write_store(),
3188                            Arc::clone(&self.transaction_manager),
3189                        )?;
3190                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3191                        let processor = if let Some(transaction_id) = transaction_id {
3192                            processor.with_transaction_context(viewing_epoch, transaction_id)
3193                        } else {
3194                            processor
3195                        };
3196                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3197                    });
3198
3199                    #[cfg(feature = "metrics")]
3200                    {
3201                        #[cfg(not(target_arch = "wasm32"))]
3202                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3203                        #[cfg(target_arch = "wasm32")]
3204                        let elapsed_ms = None;
3205                        self.record_query_metrics("cypher", elapsed_ms, &result);
3206                    }
3207
3208                    result
3209                } else {
3210                    self.execute_cypher(query)
3211                }
3212            }
3213            #[cfg(feature = "gremlin")]
3214            "gremlin" => {
3215                if let Some(p) = params {
3216                    self.execute_gremlin_with_params(query, p)
3217                } else {
3218                    self.execute_gremlin(query)
3219                }
3220            }
3221            #[cfg(feature = "graphql")]
3222            "graphql" => {
3223                if let Some(p) = params {
3224                    self.execute_graphql_with_params(query, p)
3225                } else {
3226                    self.execute_graphql(query)
3227                }
3228            }
3229            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3230            "graphql-rdf" => {
3231                if let Some(p) = params {
3232                    self.execute_graphql_rdf_with_params(query, p)
3233                } else {
3234                    self.execute_graphql_rdf(query)
3235                }
3236            }
3237            #[cfg(feature = "sql-pgq")]
3238            "sql" | "sql-pgq" => {
3239                if let Some(p) = params {
3240                    self.execute_sql_with_params(query, p)
3241                } else {
3242                    self.execute_sql(query)
3243                }
3244            }
3245            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3246            "sparql" => {
3247                if let Some(p) = params {
3248                    self.execute_sparql_with_params(query, p)
3249                } else {
3250                    self.execute_sparql(query)
3251                }
3252            }
3253            other => Err(grafeo_common::utils::error::Error::Query(
3254                grafeo_common::utils::error::QueryError::new(
3255                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3256                    format!("Unknown query language: '{other}'"),
3257                ),
3258            )),
3259        }
3260    }
3261
3262    /// Begins a new transaction.
3263    ///
3264    /// # Errors
3265    ///
3266    /// Returns an error if a transaction is already active.
3267    ///
3268    /// # Examples
3269    ///
3270    /// ```no_run
3271    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3272    /// use grafeo_engine::GrafeoDB;
3273    ///
3274    /// let db = GrafeoDB::new_in_memory();
3275    /// let mut session = db.session();
3276    ///
3277    /// session.begin_transaction()?;
3278    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3279    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3280    /// session.commit()?; // Both inserts committed atomically
3281    /// # Ok(())
3282    /// # }
3283    /// ```
3284    /// Clears all cached query plans.
3285    ///
3286    /// The plan cache is shared across all sessions on the same database,
3287    /// so clearing from one session affects all sessions.
3288    pub fn clear_plan_cache(&self) {
3289        self.query_cache.clear();
3290    }
3291
3292    /// Begins a new transaction on this session.
3293    ///
3294    /// Uses the default isolation level (`SnapshotIsolation`).
3295    ///
3296    /// # Errors
3297    ///
3298    /// Returns an error if a transaction is already active.
3299    #[cfg(feature = "lpg")]
3300    pub fn begin_transaction(&mut self) -> Result<()> {
3301        self.begin_transaction_inner(false, None)
3302    }
3303
3304    /// Begins a transaction with a specific isolation level.
3305    ///
3306    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3307    ///
3308    /// # Errors
3309    ///
3310    /// Returns an error if a transaction is already active.
3311    #[cfg(feature = "lpg")]
3312    pub fn begin_transaction_with_isolation(
3313        &mut self,
3314        isolation_level: crate::transaction::IsolationLevel,
3315    ) -> Result<()> {
3316        self.begin_transaction_inner(false, Some(isolation_level))
3317    }
3318
3319    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3320    #[cfg(feature = "lpg")]
3321    fn begin_transaction_inner(
3322        &self,
3323        read_only: bool,
3324        isolation_level: Option<crate::transaction::IsolationLevel>,
3325    ) -> Result<()> {
3326        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3327        let mut current = self.current_transaction.lock();
3328        if current.is_some() {
3329            // Nested transaction: create an auto-savepoint instead of a new tx.
3330            drop(current);
3331            let mut depth = self.transaction_nesting_depth.lock();
3332            *depth += 1;
3333            let sp_name = format!("_nested_tx_{}", *depth);
3334            self.savepoint(&sp_name)?;
3335            return Ok(());
3336        }
3337
3338        let active = self.active_lpg_store();
3339        self.transaction_start_node_count
3340            .store(active.node_count(), Ordering::Relaxed);
3341        self.transaction_start_edge_count
3342            .store(active.edge_count(), Ordering::Relaxed);
3343        let transaction_id = if let Some(level) = isolation_level {
3344            self.transaction_manager.begin_with_isolation(level)
3345        } else {
3346            self.transaction_manager.begin()
3347        };
3348        *current = Some(transaction_id);
3349        *self.read_only_tx.lock() = read_only || self.db_read_only;
3350
3351        // Record the initial graph as "touched" for cross-graph atomicity.
3352        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3353        let key = self.active_graph_storage_key();
3354        let mut touched = self.touched_graphs.lock();
3355        touched.clear();
3356        touched.push(key);
3357
3358        #[cfg(feature = "metrics")]
3359        {
3360            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3361            #[cfg(not(target_arch = "wasm32"))]
3362            {
3363                *self.tx_start_time.lock() = Some(Instant::now());
3364            }
3365        }
3366
3367        Ok(())
3368    }
3369
3370    /// Commits the current transaction.
3371    ///
3372    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3373    ///
3374    /// # Errors
3375    ///
3376    /// Returns an error if no transaction is active.
3377    #[cfg(feature = "lpg")]
3378    pub fn commit(&mut self) -> Result<()> {
3379        self.commit_inner()
3380    }
3381
3382    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3383    #[cfg(feature = "lpg")]
3384    fn commit_inner(&self) -> Result<()> {
3385        let _span = grafeo_debug_span!("grafeo::tx::commit");
3386        // Nested transaction: release the auto-savepoint (changes are preserved).
3387        {
3388            let mut depth = self.transaction_nesting_depth.lock();
3389            if *depth > 0 {
3390                let sp_name = format!("_nested_tx_{depth}");
3391                *depth -= 1;
3392                drop(depth);
3393                return self.release_savepoint(&sp_name);
3394            }
3395        }
3396
3397        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3398            grafeo_common::utils::error::Error::Transaction(
3399                grafeo_common::utils::error::TransactionError::InvalidState(
3400                    "No active transaction".to_string(),
3401                ),
3402            )
3403        })?;
3404
3405        // Validate the transaction first (conflict detection) before committing data.
3406        // If this fails, we rollback the data changes instead of making them permanent.
3407        let touched = self.touched_graphs.lock().clone();
3408        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3409            Ok(epoch) => epoch,
3410            Err(e) => {
3411                // Conflict detected: rollback the data changes
3412                for graph_name in &touched {
3413                    let store = self.resolve_store(graph_name);
3414                    store.rollback_transaction_properties(transaction_id);
3415                }
3416                #[cfg(feature = "triple-store")]
3417                self.rollback_rdf_transaction(transaction_id);
3418                // Discard buffered CDC events on conflict rollback
3419                #[cfg(feature = "cdc")]
3420                if let Some(ref pending) = self.cdc_pending_events {
3421                    pending.lock().clear();
3422                }
3423                *self.read_only_tx.lock() = self.db_read_only;
3424                self.savepoints.lock().clear();
3425                self.touched_graphs.lock().clear();
3426                #[cfg(feature = "metrics")]
3427                {
3428                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3429                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3430                    #[cfg(not(target_arch = "wasm32"))]
3431                    if let Some(start) = self.tx_start_time.lock().take() {
3432                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3433                        crate::metrics::record_metric!(
3434                            self.metrics,
3435                            tx_duration,
3436                            observe duration_ms
3437                        );
3438                    }
3439                }
3440                return Err(e);
3441            }
3442        };
3443
3444        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3445        for graph_name in &touched {
3446            let store = self.resolve_store(graph_name);
3447            store.finalize_version_epochs(transaction_id, commit_epoch);
3448        }
3449
3450        // Commit succeeded: discard undo logs (make changes permanent)
3451        #[cfg(feature = "triple-store")]
3452        self.commit_rdf_transaction(transaction_id);
3453
3454        for graph_name in &touched {
3455            let store = self.resolve_store(graph_name);
3456            store.commit_transaction_properties(transaction_id);
3457        }
3458
3459        // Flush buffered CDC events now that the transaction is committed.
3460        // All buffered events have PENDING epoch; assign the real commit_epoch.
3461        // Uses record_batch to acquire the write lock once per commit.
3462        #[cfg(feature = "cdc")]
3463        if let Some(ref pending) = self.cdc_pending_events {
3464            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3465            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3466                e.epoch = commit_epoch;
3467                e
3468            }));
3469        }
3470
3471        // Log transaction commit and epoch advance to WAL so that crash
3472        // recovery can identify committed transactions and their epoch
3473        // boundaries. Without these markers, WAL recovery discards all
3474        // records as uncommitted (fixes #252 for the crash scenario).
3475        #[cfg(feature = "wal")]
3476        if let Some(ref wal) = self.wal {
3477            use grafeo_storage::wal::WalRecord;
3478            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3479                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3480            }
3481            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3482                epoch: commit_epoch,
3483            }) {
3484                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3485            }
3486        }
3487
3488        // Sync epoch for all touched graphs so that convenience lookups
3489        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3490        let current_epoch = self.transaction_manager.current_epoch();
3491        for graph_name in &touched {
3492            let store = self.resolve_store(graph_name);
3493            store.sync_epoch(current_epoch);
3494        }
3495
3496        // Reset read-only flag, clear savepoints and touched graphs
3497        *self.read_only_tx.lock() = self.db_read_only;
3498        self.savepoints.lock().clear();
3499        self.touched_graphs.lock().clear();
3500
3501        // Auto-GC: periodically prune old MVCC versions
3502        if self.gc_interval > 0 {
3503            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3504            if count.is_multiple_of(self.gc_interval) {
3505                let min_epoch = self.transaction_manager.min_active_epoch();
3506                for graph_name in &touched {
3507                    let store = self.resolve_store(graph_name);
3508                    store.gc_versions(min_epoch);
3509                }
3510                self.transaction_manager.gc();
3511                #[cfg(feature = "metrics")]
3512                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3513            }
3514        }
3515
3516        #[cfg(feature = "metrics")]
3517        {
3518            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3519            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3520            #[cfg(not(target_arch = "wasm32"))]
3521            if let Some(start) = self.tx_start_time.lock().take() {
3522                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3523                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3524            }
3525        }
3526
3527        Ok(())
3528    }
3529
3530    /// Aborts the current transaction.
3531    ///
3532    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3533    ///
3534    /// # Errors
3535    ///
3536    /// Returns an error if no transaction is active.
3537    ///
3538    /// # Examples
3539    ///
3540    /// ```no_run
3541    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3542    /// use grafeo_engine::GrafeoDB;
3543    ///
3544    /// let db = GrafeoDB::new_in_memory();
3545    /// let mut session = db.session();
3546    ///
3547    /// session.begin_transaction()?;
3548    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3549    /// session.rollback()?; // Insert is discarded
3550    /// # Ok(())
3551    /// # }
3552    /// ```
3553    #[cfg(feature = "lpg")]
3554    pub fn rollback(&mut self) -> Result<()> {
3555        self.rollback_inner()
3556    }
3557
3558    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3559    #[cfg(feature = "lpg")]
3560    fn rollback_inner(&self) -> Result<()> {
3561        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3562        // Nested transaction: rollback to the auto-savepoint.
3563        {
3564            let mut depth = self.transaction_nesting_depth.lock();
3565            if *depth > 0 {
3566                let sp_name = format!("_nested_tx_{depth}");
3567                *depth -= 1;
3568                drop(depth);
3569                return self.rollback_to_savepoint(&sp_name);
3570            }
3571        }
3572
3573        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3574            grafeo_common::utils::error::Error::Transaction(
3575                grafeo_common::utils::error::TransactionError::InvalidState(
3576                    "No active transaction".to_string(),
3577                ),
3578            )
3579        })?;
3580
3581        // Reset read-only flag
3582        *self.read_only_tx.lock() = self.db_read_only;
3583
3584        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3585        let touched = self.touched_graphs.lock().clone();
3586        for graph_name in &touched {
3587            let store = self.resolve_store(graph_name);
3588            store.discard_uncommitted_versions(transaction_id);
3589        }
3590
3591        // Discard pending operations in the RDF store
3592        #[cfg(feature = "triple-store")]
3593        self.rollback_rdf_transaction(transaction_id);
3594
3595        // Discard buffered CDC events on rollback
3596        #[cfg(feature = "cdc")]
3597        if let Some(ref pending) = self.cdc_pending_events {
3598            pending.lock().clear();
3599        }
3600
3601        // Clear savepoints and touched graphs
3602        self.savepoints.lock().clear();
3603        self.touched_graphs.lock().clear();
3604
3605        // Mark transaction as aborted in the manager
3606        let result = self.transaction_manager.abort(transaction_id);
3607
3608        #[cfg(feature = "metrics")]
3609        if result.is_ok() {
3610            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3611            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3612            #[cfg(not(target_arch = "wasm32"))]
3613            if let Some(start) = self.tx_start_time.lock().take() {
3614                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3615                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3616            }
3617        }
3618
3619        result
3620    }
3621
3622    /// Creates a named savepoint within the current transaction.
3623    ///
3624    /// The savepoint captures the current node/edge ID counters so that
3625    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3626    /// entities created after this point.
3627    ///
3628    /// # Errors
3629    ///
3630    /// Returns an error if no transaction is active.
3631    #[cfg(feature = "lpg")]
3632    pub fn savepoint(&self, name: &str) -> Result<()> {
3633        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3634            grafeo_common::utils::error::Error::Transaction(
3635                grafeo_common::utils::error::TransactionError::InvalidState(
3636                    "No active transaction".to_string(),
3637                ),
3638            )
3639        })?;
3640
3641        // Capture state for every graph touched so far.
3642        let touched = self.touched_graphs.lock().clone();
3643        let graph_snapshots: Vec<GraphSavepoint> = touched
3644            .iter()
3645            .map(|graph_name| {
3646                let store = self.resolve_store(graph_name);
3647                GraphSavepoint {
3648                    graph_name: graph_name.clone(),
3649                    next_node_id: store.peek_next_node_id(),
3650                    next_edge_id: store.peek_next_edge_id(),
3651                    undo_log_position: store.property_undo_log_position(tx_id),
3652                }
3653            })
3654            .collect();
3655
3656        self.savepoints.lock().push(SavepointState {
3657            name: name.to_string(),
3658            graph_snapshots,
3659            active_graph: self.current_graph.lock().clone(),
3660            #[cfg(feature = "cdc")]
3661            cdc_event_position: self
3662                .cdc_pending_events
3663                .as_ref()
3664                .map_or(0, |p| p.lock().len()),
3665        });
3666        Ok(())
3667    }
3668
3669    /// Rolls back to a named savepoint, undoing all writes made after it.
3670    ///
3671    /// The savepoint and any savepoints created after it are removed.
3672    /// Entities with IDs >= the savepoint snapshot are discarded.
3673    ///
3674    /// # Errors
3675    ///
3676    /// Returns an error if no transaction is active or the savepoint does not exist.
3677    #[cfg(feature = "lpg")]
3678    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3679        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3680            grafeo_common::utils::error::Error::Transaction(
3681                grafeo_common::utils::error::TransactionError::InvalidState(
3682                    "No active transaction".to_string(),
3683                ),
3684            )
3685        })?;
3686
3687        let mut savepoints = self.savepoints.lock();
3688
3689        // Find the savepoint by name (search from the end for nested savepoints)
3690        let pos = savepoints
3691            .iter()
3692            .rposition(|sp| sp.name == name)
3693            .ok_or_else(|| {
3694                grafeo_common::utils::error::Error::Transaction(
3695                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3696                        "Savepoint '{name}' not found"
3697                    )),
3698                )
3699            })?;
3700
3701        let sp_state = savepoints[pos].clone();
3702
3703        // Remove this savepoint and all later ones
3704        savepoints.truncate(pos);
3705        drop(savepoints);
3706
3707        // Roll back each graph that was captured in the savepoint.
3708        for gs in &sp_state.graph_snapshots {
3709            let store = self.resolve_store(&gs.graph_name);
3710
3711            // Replay property/label undo entries recorded after the savepoint
3712            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3713
3714            // Discard entities created after the savepoint
3715            let current_next_node = store.peek_next_node_id();
3716            let current_next_edge = store.peek_next_edge_id();
3717
3718            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3719                .map(NodeId::new)
3720                .collect();
3721            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3722                .map(EdgeId::new)
3723                .collect();
3724
3725            if !node_ids.is_empty() || !edge_ids.is_empty() {
3726                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3727            }
3728        }
3729
3730        // Also roll back any graphs that were touched AFTER the savepoint
3731        // but not captured in it. These need full discard since the savepoint
3732        // didn't include them.
3733        let touched = self.touched_graphs.lock().clone();
3734        for graph_name in &touched {
3735            let already_captured = sp_state
3736                .graph_snapshots
3737                .iter()
3738                .any(|gs| gs.graph_name == *graph_name);
3739            if !already_captured {
3740                let store = self.resolve_store(graph_name);
3741                store.discard_uncommitted_versions(transaction_id);
3742            }
3743        }
3744
3745        // Truncate CDC event buffer to the savepoint position.
3746        #[cfg(feature = "cdc")]
3747        if let Some(ref pending) = self.cdc_pending_events {
3748            pending.lock().truncate(sp_state.cdc_event_position);
3749        }
3750
3751        // Restore touched_graphs to only the graphs that were known at savepoint time.
3752        let mut touched = self.touched_graphs.lock();
3753        touched.clear();
3754        for gs in &sp_state.graph_snapshots {
3755            if !touched.contains(&gs.graph_name) {
3756                touched.push(gs.graph_name.clone());
3757            }
3758        }
3759
3760        Ok(())
3761    }
3762
3763    /// Releases (removes) a named savepoint without rolling back.
3764    ///
3765    /// # Errors
3766    ///
3767    /// Returns an error if no transaction is active or the savepoint does not exist.
3768    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3769        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3770            grafeo_common::utils::error::Error::Transaction(
3771                grafeo_common::utils::error::TransactionError::InvalidState(
3772                    "No active transaction".to_string(),
3773                ),
3774            )
3775        })?;
3776
3777        let mut savepoints = self.savepoints.lock();
3778        let pos = savepoints
3779            .iter()
3780            .rposition(|sp| sp.name == name)
3781            .ok_or_else(|| {
3782                grafeo_common::utils::error::Error::Transaction(
3783                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3784                        "Savepoint '{name}' not found"
3785                    )),
3786                )
3787            })?;
3788        savepoints.remove(pos);
3789        Ok(())
3790    }
3791
3792    /// Returns whether a transaction is active.
3793    #[must_use]
3794    pub fn in_transaction(&self) -> bool {
3795        self.current_transaction.lock().is_some()
3796    }
3797
3798    /// Returns the current transaction ID, if any.
3799    #[must_use]
3800    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3801        *self.current_transaction.lock()
3802    }
3803
3804    /// Returns a reference to the transaction manager.
3805    #[must_use]
3806    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3807        &self.transaction_manager
3808    }
3809
3810    /// Returns the store's current node count and the count at transaction start.
3811    #[cfg(feature = "lpg")]
3812    #[must_use]
3813    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3814        (
3815            self.transaction_start_node_count.load(Ordering::Relaxed),
3816            self.active_lpg_store().node_count(),
3817        )
3818    }
3819
3820    /// Returns the store's current edge count and the count at transaction start.
3821    #[cfg(feature = "lpg")]
3822    #[must_use]
3823    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3824        (
3825            self.transaction_start_edge_count.load(Ordering::Relaxed),
3826            self.active_lpg_store().edge_count(),
3827        )
3828    }
3829
3830    /// Prepares the current transaction for a two-phase commit.
3831    ///
3832    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3833    /// lets you inspect pending changes and attach metadata before finalizing.
3834    /// The mutable borrow prevents concurrent operations while the commit is
3835    /// pending.
3836    ///
3837    /// If the `PreparedCommit` is dropped without calling `commit()` or
3838    /// `abort()`, the transaction is automatically rolled back.
3839    ///
3840    /// # Errors
3841    ///
3842    /// Returns an error if no transaction is active.
3843    ///
3844    /// # Examples
3845    ///
3846    /// ```no_run
3847    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3848    /// use grafeo_engine::GrafeoDB;
3849    ///
3850    /// let db = GrafeoDB::new_in_memory();
3851    /// let mut session = db.session();
3852    ///
3853    /// session.begin_transaction()?;
3854    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3855    ///
3856    /// let mut prepared = session.prepare_commit()?;
3857    /// println!("Nodes written: {}", prepared.info().nodes_written);
3858    /// prepared.set_metadata("audit_user", "admin");
3859    /// prepared.commit()?;
3860    /// # Ok(())
3861    /// # }
3862    /// ```
3863    #[cfg(feature = "lpg")]
3864    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3865        crate::transaction::PreparedCommit::new(self)
3866    }
3867
3868    /// Sets auto-commit mode.
3869    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3870        self.auto_commit = auto_commit;
3871    }
3872
3873    /// Returns whether auto-commit is enabled.
3874    #[must_use]
3875    pub fn auto_commit(&self) -> bool {
3876        self.auto_commit
3877    }
3878
3879    /// Returns `true` if auto-commit should wrap this execution.
3880    ///
3881    /// Auto-commit kicks in when: the session is in auto-commit mode,
3882    /// no explicit transaction is active, and the query mutates data.
3883    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3884        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3885    }
3886
3887    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3888    /// returns `true`. On error the transaction is rolled back.
3889    #[cfg(feature = "lpg")]
3890    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3891    where
3892        F: FnOnce() -> Result<QueryResult>,
3893    {
3894        if self.needs_auto_commit(has_mutations) {
3895            self.begin_transaction_inner(false, None)?;
3896            match body() {
3897                Ok(result) => {
3898                    self.commit_inner()?;
3899                    Ok(result)
3900                }
3901                Err(e) => {
3902                    let _ = self.rollback_inner();
3903                    Err(e)
3904                }
3905            }
3906        } else {
3907            body()
3908        }
3909    }
3910
3911    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
3912    #[cfg(not(feature = "lpg"))]
3913    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
3914    where
3915        F: FnOnce() -> Result<QueryResult>,
3916    {
3917        body()
3918    }
3919
3920    /// Quick heuristic: returns `true` when the query text looks like it
3921    /// performs a mutation. Used by `_with_params` paths that go through the
3922    /// `QueryProcessor` (where the logical plan isn't available before
3923    /// execution). False negatives are harmless: the data just won't be
3924    /// auto-committed, which matches the prior behaviour.
3925    fn query_looks_like_mutation(query: &str) -> bool {
3926        let upper = query.to_ascii_uppercase();
3927        upper.contains("INSERT")
3928            || upper.contains("CREATE")
3929            || upper.contains("DELETE")
3930            || upper.contains("MERGE")
3931            || upper.contains("SET")
3932            || upper.contains("REMOVE")
3933            || upper.contains("DROP")
3934            || upper.contains("ALTER")
3935    }
3936
3937    /// Computes the wall-clock deadline for query execution.
3938    #[must_use]
3939    fn query_deadline(&self) -> Option<Instant> {
3940        #[cfg(not(target_arch = "wasm32"))]
3941        {
3942            self.query_timeout.map(|d| Instant::now() + d)
3943        }
3944        #[cfg(target_arch = "wasm32")]
3945        {
3946            let _ = &self.query_timeout;
3947            None
3948        }
3949    }
3950
3951    /// Records query metrics for any language.
3952    ///
3953    /// Called after query execution to update counters, latency histogram,
3954    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3955    /// where `Instant` is unavailable.
3956    #[cfg(feature = "metrics")]
3957    fn record_query_metrics(
3958        &self,
3959        language: &str,
3960        elapsed_ms: Option<f64>,
3961        result: &Result<crate::database::QueryResult>,
3962    ) {
3963        use crate::metrics::record_metric;
3964
3965        record_metric!(self.metrics, query_count, inc);
3966        if let Some(ref reg) = self.metrics {
3967            reg.query_count_by_language.increment(language);
3968        }
3969        if let Some(ms) = elapsed_ms {
3970            record_metric!(self.metrics, query_latency, observe ms);
3971        }
3972        match result {
3973            Ok(r) => {
3974                let returned = r.rows.len() as u64;
3975                record_metric!(self.metrics, rows_returned, add returned);
3976                if let Some(scanned) = r.rows_scanned {
3977                    record_metric!(self.metrics, rows_scanned, add scanned);
3978                }
3979            }
3980            Err(e) => {
3981                record_metric!(self.metrics, query_errors, inc);
3982                // Detect timeout errors
3983                let msg = e.to_string();
3984                if msg.contains("exceeded timeout") {
3985                    record_metric!(self.metrics, query_timeouts, inc);
3986                }
3987            }
3988        }
3989    }
3990
3991    /// Evaluates a simple integer literal from a session parameter expression.
3992    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3993        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3994        match expr {
3995            Expression::Literal(Literal::Integer(n)) => Some(*n),
3996            _ => None,
3997        }
3998    }
3999
4000    /// Returns the current transaction context for MVCC visibility.
4001    ///
4002    /// Returns `(viewing_epoch, transaction_id)` where:
4003    /// - `viewing_epoch` is the epoch at which to check version visibility
4004    /// - `transaction_id` is the current transaction ID (if in a transaction)
4005    #[must_use]
4006    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4007        // Time-travel override takes precedence (read-only, no tx context)
4008        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4009            return (epoch, None);
4010        }
4011
4012        if let Some(transaction_id) = *self.current_transaction.lock() {
4013            // In a transaction: use the transaction's start epoch
4014            let epoch = self
4015                .transaction_manager
4016                .start_epoch(transaction_id)
4017                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4018            (epoch, Some(transaction_id))
4019        } else {
4020            // No transaction: use current epoch
4021            (self.transaction_manager.current_epoch(), None)
4022        }
4023    }
4024
4025    /// Creates a planner with transaction context and constraint validator.
4026    ///
4027    /// The `store` parameter is the graph store to plan against (use
4028    /// `self.active_store()` for graph-aware execution).
4029    fn create_planner_for_store(
4030        &self,
4031        store: Arc<dyn GraphStore>,
4032        viewing_epoch: EpochId,
4033        transaction_id: Option<TransactionId>,
4034    ) -> crate::query::Planner {
4035        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4036    }
4037
4038    fn create_planner_for_store_with_read_only(
4039        &self,
4040        store: Arc<dyn GraphStore>,
4041        viewing_epoch: EpochId,
4042        transaction_id: Option<TransactionId>,
4043        read_only: bool,
4044    ) -> crate::query::Planner {
4045        use crate::query::Planner;
4046        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4047
4048        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4049        let info_store = Arc::clone(&store);
4050        let schema_store = Arc::clone(&store);
4051
4052        let session_context = SessionContext {
4053            current_schema: self.current_schema(),
4054            current_graph: self.current_graph(),
4055            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4056            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4057        };
4058
4059        let write_store = self.active_write_store();
4060
4061        let mut planner = Planner::with_context(
4062            Arc::clone(&store),
4063            write_store,
4064            Arc::clone(&self.transaction_manager),
4065            transaction_id,
4066            viewing_epoch,
4067        )
4068        .with_factorized_execution(self.factorized_execution)
4069        .with_catalog(Arc::clone(&self.catalog))
4070        .with_session_context(session_context)
4071        .with_read_only(read_only);
4072
4073        // Attach the constraint validator for schema enforcement
4074        let validator =
4075            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
4076        planner = planner.with_validator(Arc::new(validator));
4077
4078        planner
4079    }
4080
4081    /// Builds a `Value::Map` for the `info()` introspection function.
4082    fn build_info_value(store: &dyn GraphStore) -> Value {
4083        use grafeo_common::types::PropertyKey;
4084        use std::collections::BTreeMap;
4085
4086        let mut map = BTreeMap::new();
4087        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4088        map.insert(
4089            PropertyKey::from("node_count"),
4090            Value::Int64(store.node_count() as i64),
4091        );
4092        map.insert(
4093            PropertyKey::from("edge_count"),
4094            Value::Int64(store.edge_count() as i64),
4095        );
4096        map.insert(
4097            PropertyKey::from("version"),
4098            Value::String(env!("CARGO_PKG_VERSION").into()),
4099        );
4100        Value::Map(map.into())
4101    }
4102
4103    /// Builds a `Value::Map` for the `schema()` introspection function.
4104    fn build_schema_value(store: &dyn GraphStore) -> Value {
4105        use grafeo_common::types::PropertyKey;
4106        use std::collections::BTreeMap;
4107
4108        let labels: Vec<Value> = store
4109            .all_labels()
4110            .into_iter()
4111            .map(|l| Value::String(l.into()))
4112            .collect();
4113        let edge_types: Vec<Value> = store
4114            .all_edge_types()
4115            .into_iter()
4116            .map(|t| Value::String(t.into()))
4117            .collect();
4118        let property_keys: Vec<Value> = store
4119            .all_property_keys()
4120            .into_iter()
4121            .map(|k| Value::String(k.into()))
4122            .collect();
4123
4124        let mut map = BTreeMap::new();
4125        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4126        map.insert(
4127            PropertyKey::from("edge_types"),
4128            Value::List(edge_types.into()),
4129        );
4130        map.insert(
4131            PropertyKey::from("property_keys"),
4132            Value::List(property_keys.into()),
4133        );
4134        Value::Map(map.into())
4135    }
4136
4137    /// Creates a node directly (bypassing query execution).
4138    ///
4139    /// This is a low-level API for testing and direct manipulation.
4140    /// If a transaction is active, the node will be versioned with the transaction ID.
4141    #[cfg(feature = "lpg")]
4142    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4143        let (epoch, transaction_id) = self.get_transaction_context();
4144        self.active_lpg_store().create_node_versioned(
4145            labels,
4146            epoch,
4147            transaction_id.unwrap_or(TransactionId::SYSTEM),
4148        )
4149    }
4150
4151    /// Creates a node with properties.
4152    ///
4153    /// If a transaction is active, the node will be versioned with the transaction ID.
4154    #[cfg(feature = "lpg")]
4155    pub fn create_node_with_props<'a>(
4156        &self,
4157        labels: &[&str],
4158        properties: impl IntoIterator<Item = (&'a str, Value)>,
4159    ) -> NodeId {
4160        let (epoch, transaction_id) = self.get_transaction_context();
4161        self.active_lpg_store().create_node_with_props_versioned(
4162            labels,
4163            properties,
4164            epoch,
4165            transaction_id.unwrap_or(TransactionId::SYSTEM),
4166        )
4167    }
4168
4169    /// Creates an edge between two nodes.
4170    ///
4171    /// This is a low-level API for testing and direct manipulation.
4172    /// If a transaction is active, the edge will be versioned with the transaction ID.
4173    #[cfg(feature = "lpg")]
4174    pub fn create_edge(
4175        &self,
4176        src: NodeId,
4177        dst: NodeId,
4178        edge_type: &str,
4179    ) -> grafeo_common::types::EdgeId {
4180        let (epoch, transaction_id) = self.get_transaction_context();
4181        self.active_lpg_store().create_edge_versioned(
4182            src,
4183            dst,
4184            edge_type,
4185            epoch,
4186            transaction_id.unwrap_or(TransactionId::SYSTEM),
4187        )
4188    }
4189
4190    /// Creates an edge with properties within the active transaction context.
4191    #[cfg(feature = "lpg")]
4192    pub fn create_edge_with_props<'a>(
4193        &self,
4194        src: NodeId,
4195        dst: NodeId,
4196        edge_type: &str,
4197        properties: impl IntoIterator<Item = (&'a str, Value)>,
4198    ) -> grafeo_common::types::EdgeId {
4199        let (epoch, transaction_id) = self.get_transaction_context();
4200        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4201        let store = self.active_lpg_store();
4202        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4203        for (key, value) in properties {
4204            store.set_edge_property_versioned(eid, key, value, tid);
4205        }
4206        eid
4207    }
4208
4209    /// Sets a node property within the active transaction context.
4210    #[cfg(feature = "lpg")]
4211    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4212        let (_, transaction_id) = self.get_transaction_context();
4213        if let Some(tid) = transaction_id {
4214            self.active_lpg_store()
4215                .set_node_property_versioned(id, key, value, tid);
4216        } else {
4217            self.active_lpg_store().set_node_property(id, key, value);
4218        }
4219    }
4220
4221    /// Sets an edge property within the active transaction context.
4222    #[cfg(feature = "lpg")]
4223    pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4224        let (_, transaction_id) = self.get_transaction_context();
4225        if let Some(tid) = transaction_id {
4226            self.active_lpg_store()
4227                .set_edge_property_versioned(id, key, value, tid);
4228        } else {
4229            self.active_lpg_store().set_edge_property(id, key, value);
4230        }
4231    }
4232
4233    /// Deletes a node within the active transaction context.
4234    #[cfg(feature = "lpg")]
4235    pub fn delete_node(&self, id: NodeId) -> bool {
4236        let (epoch, transaction_id) = self.get_transaction_context();
4237        if let Some(tid) = transaction_id {
4238            self.active_lpg_store()
4239                .delete_node_versioned(id, epoch, tid)
4240        } else {
4241            self.active_lpg_store().delete_node(id)
4242        }
4243    }
4244
4245    /// Deletes an edge within the active transaction context.
4246    #[cfg(feature = "lpg")]
4247    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4248        let (epoch, transaction_id) = self.get_transaction_context();
4249        if let Some(tid) = transaction_id {
4250            self.active_lpg_store()
4251                .delete_edge_versioned(id, epoch, tid)
4252        } else {
4253            self.active_lpg_store().delete_edge(id)
4254        }
4255    }
4256
4257    // =========================================================================
4258    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4259    // =========================================================================
4260
4261    /// Gets a node by ID directly, bypassing query planning.
4262    ///
4263    /// This is the fastest way to retrieve a single node when you know its ID.
4264    /// Skips parsing, binding, optimization, and physical planning entirely.
4265    ///
4266    /// # Performance
4267    ///
4268    /// - Time complexity: O(1) average case
4269    /// - No lock contention (uses DashMap internally)
4270    /// - ~20-30x faster than equivalent MATCH query
4271    ///
4272    /// # Example
4273    ///
4274    /// ```no_run
4275    /// # use grafeo_engine::GrafeoDB;
4276    /// # let db = GrafeoDB::new_in_memory();
4277    /// let session = db.session();
4278    /// let node_id = session.create_node(&["Person"]);
4279    ///
4280    /// // Direct lookup - O(1), no query planning
4281    /// let node = session.get_node(node_id);
4282    /// assert!(node.is_some());
4283    /// ```
4284    #[cfg(feature = "lpg")]
4285    #[must_use]
4286    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4287        let (epoch, transaction_id) = self.get_transaction_context();
4288        self.active_lpg_store().get_node_versioned(
4289            id,
4290            epoch,
4291            transaction_id.unwrap_or(TransactionId::SYSTEM),
4292        )
4293    }
4294
4295    /// Gets a single property from a node by ID, bypassing query planning.
4296    ///
4297    /// More efficient than `get_node()` when you only need one property,
4298    /// as it avoids loading the full node with all properties.
4299    ///
4300    /// # Performance
4301    ///
4302    /// - Time complexity: O(1) average case
4303    /// - No query planning overhead
4304    ///
4305    /// # Example
4306    ///
4307    /// ```no_run
4308    /// # use grafeo_engine::GrafeoDB;
4309    /// # use grafeo_common::types::Value;
4310    /// # let db = GrafeoDB::new_in_memory();
4311    /// let session = db.session();
4312    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4313    ///
4314    /// // Direct property access - O(1)
4315    /// let name = session.get_node_property(id, "name");
4316    /// assert_eq!(name, Some(Value::String("Alix".into())));
4317    /// ```
4318    #[cfg(feature = "lpg")]
4319    #[must_use]
4320    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4321        self.get_node(id)
4322            .and_then(|node| node.get_property(key).cloned())
4323    }
4324
4325    /// Gets an edge by ID directly, bypassing query planning.
4326    ///
4327    /// # Performance
4328    ///
4329    /// - Time complexity: O(1) average case
4330    /// - No lock contention
4331    #[cfg(feature = "lpg")]
4332    #[must_use]
4333    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4334        let (epoch, transaction_id) = self.get_transaction_context();
4335        self.active_lpg_store().get_edge_versioned(
4336            id,
4337            epoch,
4338            transaction_id.unwrap_or(TransactionId::SYSTEM),
4339        )
4340    }
4341
4342    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4343    ///
4344    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4345    ///
4346    /// # Performance
4347    ///
4348    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4349    /// - Uses adjacency index for direct access
4350    /// - ~10-20x faster than equivalent MATCH query
4351    ///
4352    /// # Example
4353    ///
4354    /// ```no_run
4355    /// # use grafeo_engine::GrafeoDB;
4356    /// # let db = GrafeoDB::new_in_memory();
4357    /// let session = db.session();
4358    /// let alix = session.create_node(&["Person"]);
4359    /// let gus = session.create_node(&["Person"]);
4360    /// session.create_edge(alix, gus, "KNOWS");
4361    ///
4362    /// // Direct neighbor lookup - O(degree)
4363    /// let neighbors = session.get_neighbors_outgoing(alix);
4364    /// assert_eq!(neighbors.len(), 1);
4365    /// assert_eq!(neighbors[0].0, gus);
4366    /// ```
4367    #[cfg(feature = "lpg")]
4368    #[must_use]
4369    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4370        self.active_lpg_store()
4371            .edges_from(node, Direction::Outgoing)
4372            .collect()
4373    }
4374
4375    /// Gets incoming neighbors of a node directly, bypassing query planning.
4376    ///
4377    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4378    ///
4379    /// # Performance
4380    ///
4381    /// - Time complexity: O(degree) where degree is the number of incoming edges
4382    /// - Uses backward adjacency index for direct access
4383    #[cfg(feature = "lpg")]
4384    #[must_use]
4385    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4386        self.active_lpg_store()
4387            .edges_from(node, Direction::Incoming)
4388            .collect()
4389    }
4390
4391    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4392    ///
4393    /// # Example
4394    ///
4395    /// ```no_run
4396    /// # use grafeo_engine::GrafeoDB;
4397    /// # let db = GrafeoDB::new_in_memory();
4398    /// # let session = db.session();
4399    /// # let alix = session.create_node(&["Person"]);
4400    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4401    /// ```
4402    #[cfg(feature = "lpg")]
4403    #[must_use]
4404    pub fn get_neighbors_outgoing_by_type(
4405        &self,
4406        node: NodeId,
4407        edge_type: &str,
4408    ) -> Vec<(NodeId, EdgeId)> {
4409        self.active_lpg_store()
4410            .edges_from(node, Direction::Outgoing)
4411            .filter(|(_, edge_id)| {
4412                self.get_edge(*edge_id)
4413                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4414            })
4415            .collect()
4416    }
4417
4418    /// Checks if a node exists, bypassing query planning.
4419    ///
4420    /// # Performance
4421    ///
4422    /// - Time complexity: O(1)
4423    /// - Fastest existence check available
4424    #[cfg(feature = "lpg")]
4425    #[must_use]
4426    pub fn node_exists(&self, id: NodeId) -> bool {
4427        self.get_node(id).is_some()
4428    }
4429
4430    /// Checks if an edge exists, bypassing query planning.
4431    #[cfg(feature = "lpg")]
4432    #[must_use]
4433    pub fn edge_exists(&self, id: EdgeId) -> bool {
4434        self.get_edge(id).is_some()
4435    }
4436
4437    /// Gets the degree (number of edges) of a node.
4438    ///
4439    /// Returns (outgoing_degree, incoming_degree).
4440    #[cfg(feature = "lpg")]
4441    #[must_use]
4442    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4443        let active = self.active_lpg_store();
4444        let out = active.out_degree(node);
4445        let in_degree = active.in_degree(node);
4446        (out, in_degree)
4447    }
4448
4449    /// Batch lookup of multiple nodes by ID.
4450    ///
4451    /// More efficient than calling `get_node()` in a loop because it
4452    /// amortizes overhead.
4453    ///
4454    /// # Performance
4455    ///
4456    /// - Time complexity: O(n) where n is the number of IDs
4457    /// - Better cache utilization than individual lookups
4458    #[cfg(feature = "lpg")]
4459    #[must_use]
4460    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4461        let (epoch, transaction_id) = self.get_transaction_context();
4462        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4463        let active = self.active_lpg_store();
4464        ids.iter()
4465            .map(|&id| active.get_node_versioned(id, epoch, tx))
4466            .collect()
4467    }
4468
4469    // ── Change Data Capture ─────────────────────────────────────────────
4470
4471    /// Returns the full change history for an entity (node or edge).
4472    ///
4473    /// # Errors
4474    ///
4475    /// Currently infallible, but returns `Result` for forward compatibility.
4476    #[cfg(feature = "cdc")]
4477    pub fn history(
4478        &self,
4479        entity_id: impl Into<crate::cdc::EntityId>,
4480    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4481        Ok(self.cdc_log.history(entity_id.into()))
4482    }
4483
4484    /// Returns change events for an entity since the given epoch.
4485    ///
4486    /// # Errors
4487    ///
4488    /// Currently infallible, but returns `Result` for forward compatibility.
4489    #[cfg(feature = "cdc")]
4490    pub fn history_since(
4491        &self,
4492        entity_id: impl Into<crate::cdc::EntityId>,
4493        since_epoch: EpochId,
4494    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4495        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4496    }
4497
4498    /// Returns all change events across all entities in an epoch range.
4499    ///
4500    /// # Errors
4501    ///
4502    /// Currently infallible, but returns `Result` for forward compatibility.
4503    #[cfg(feature = "cdc")]
4504    pub fn changes_between(
4505        &self,
4506        start_epoch: EpochId,
4507        end_epoch: EpochId,
4508    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4509        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4510    }
4511}
4512
4513impl Drop for Session {
4514    fn drop(&mut self) {
4515        // Auto-rollback any active transaction to prevent leaked MVCC state,
4516        // dangling write locks, and uncommitted versions lingering in the store.
4517        #[cfg(feature = "lpg")]
4518        if self.in_transaction() {
4519            let _ = self.rollback_inner();
4520        }
4521
4522        #[cfg(feature = "metrics")]
4523        if let Some(ref reg) = self.metrics {
4524            reg.session_active
4525                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4526        }
4527    }
4528}
4529
4530#[cfg(test)]
4531mod tests {
4532    use super::parse_default_literal;
4533    use crate::database::GrafeoDB;
4534    use grafeo_common::types::Value;
4535
4536    // -----------------------------------------------------------------------
4537    // parse_default_literal
4538    // -----------------------------------------------------------------------
4539
4540    #[test]
4541    fn parse_default_literal_null() {
4542        assert_eq!(parse_default_literal("null"), Value::Null);
4543        assert_eq!(parse_default_literal("NULL"), Value::Null);
4544        assert_eq!(parse_default_literal("Null"), Value::Null);
4545    }
4546
4547    #[test]
4548    fn parse_default_literal_bool() {
4549        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4550        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4551        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4552        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4553    }
4554
4555    #[test]
4556    fn parse_default_literal_string_single_quoted() {
4557        assert_eq!(
4558            parse_default_literal("'hello'"),
4559            Value::String("hello".into())
4560        );
4561    }
4562
4563    #[test]
4564    fn parse_default_literal_string_double_quoted() {
4565        assert_eq!(
4566            parse_default_literal("\"world\""),
4567            Value::String("world".into())
4568        );
4569    }
4570
4571    #[test]
4572    fn parse_default_literal_integer() {
4573        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4574        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4575        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4576    }
4577
4578    #[test]
4579    fn parse_default_literal_float() {
4580        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4581        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4582    }
4583
4584    #[test]
4585    fn parse_default_literal_fallback_string() {
4586        // Not a recognized literal, not quoted, not a number
4587        assert_eq!(
4588            parse_default_literal("some_identifier"),
4589            Value::String("some_identifier".into())
4590        );
4591    }
4592
4593    #[test]
4594    fn test_session_create_node() {
4595        let db = GrafeoDB::new_in_memory();
4596        let session = db.session();
4597
4598        let id = session.create_node(&["Person"]);
4599        assert!(id.is_valid());
4600        assert_eq!(db.node_count(), 1);
4601    }
4602
4603    #[test]
4604    fn test_session_transaction() {
4605        let db = GrafeoDB::new_in_memory();
4606        let mut session = db.session();
4607
4608        assert!(!session.in_transaction());
4609
4610        session.begin_transaction().unwrap();
4611        assert!(session.in_transaction());
4612
4613        session.commit().unwrap();
4614        assert!(!session.in_transaction());
4615    }
4616
4617    #[test]
4618    fn test_session_transaction_context() {
4619        let db = GrafeoDB::new_in_memory();
4620        let mut session = db.session();
4621
4622        // Without transaction - context should have current epoch and no transaction_id
4623        let (_epoch1, transaction_id1) = session.get_transaction_context();
4624        assert!(transaction_id1.is_none());
4625
4626        // Start a transaction
4627        session.begin_transaction().unwrap();
4628        let (epoch2, transaction_id2) = session.get_transaction_context();
4629        assert!(transaction_id2.is_some());
4630        // Transaction should have a valid epoch
4631        let _ = epoch2; // Use the variable
4632
4633        // Commit and verify
4634        session.commit().unwrap();
4635        let (epoch3, tx_id3) = session.get_transaction_context();
4636        assert!(tx_id3.is_none());
4637        // Epoch should have advanced after commit
4638        assert!(epoch3.as_u64() >= epoch2.as_u64());
4639    }
4640
4641    #[test]
4642    fn test_session_rollback() {
4643        let db = GrafeoDB::new_in_memory();
4644        let mut session = db.session();
4645
4646        session.begin_transaction().unwrap();
4647        session.rollback().unwrap();
4648        assert!(!session.in_transaction());
4649    }
4650
4651    #[test]
4652    fn test_session_rollback_discards_versions() {
4653        use grafeo_common::types::TransactionId;
4654
4655        let db = GrafeoDB::new_in_memory();
4656
4657        // Create a node outside of any transaction (at system level)
4658        let node_before = db.store().create_node(&["Person"]);
4659        assert!(node_before.is_valid());
4660        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4661
4662        // Start a transaction
4663        let mut session = db.session();
4664        session.begin_transaction().unwrap();
4665        let transaction_id = session.current_transaction.lock().unwrap();
4666
4667        // Create a node versioned with the transaction's ID
4668        let epoch = db.store().current_epoch();
4669        let node_in_tx = db
4670            .store()
4671            .create_node_versioned(&["Person"], epoch, transaction_id);
4672        assert!(node_in_tx.is_valid());
4673
4674        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4675        // non-versioned lookups like node_count(). Verify the node is visible
4676        // only through the owning transaction.
4677        assert_eq!(
4678            db.node_count(),
4679            1,
4680            "PENDING nodes should be invisible to non-versioned node_count()"
4681        );
4682        assert!(
4683            db.store()
4684                .get_node_versioned(node_in_tx, epoch, transaction_id)
4685                .is_some(),
4686            "Transaction node should be visible to its own transaction"
4687        );
4688
4689        // Rollback the transaction
4690        session.rollback().unwrap();
4691        assert!(!session.in_transaction());
4692
4693        // The node created in the transaction should be discarded
4694        // Only the first node should remain visible
4695        let count_after = db.node_count();
4696        assert_eq!(
4697            count_after, 1,
4698            "Rollback should discard uncommitted node, but got {count_after}"
4699        );
4700
4701        // The original node should still be accessible
4702        let current_epoch = db.store().current_epoch();
4703        assert!(
4704            db.store()
4705                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4706                .is_some(),
4707            "Original node should still exist"
4708        );
4709
4710        // The node created in the transaction should not be accessible
4711        assert!(
4712            db.store()
4713                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4714                .is_none(),
4715            "Transaction node should be gone"
4716        );
4717    }
4718
4719    #[test]
4720    fn test_session_create_node_in_transaction() {
4721        // Test that session.create_node() is transaction-aware
4722        let db = GrafeoDB::new_in_memory();
4723
4724        // Create a node outside of any transaction
4725        let node_before = db.create_node(&["Person"]);
4726        assert!(node_before.is_valid());
4727        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4728
4729        // Start a transaction and create a node through the session
4730        let mut session = db.session();
4731        session.begin_transaction().unwrap();
4732        let transaction_id = session.current_transaction.lock().unwrap();
4733
4734        // Create a node through session.create_node() - should be versioned with tx
4735        let node_in_tx = session.create_node(&["Person"]);
4736        assert!(node_in_tx.is_valid());
4737
4738        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4739        // non-versioned lookups. Verify the node is visible only to its own tx.
4740        assert_eq!(
4741            db.node_count(),
4742            1,
4743            "PENDING nodes should be invisible to non-versioned node_count()"
4744        );
4745        let epoch = db.store().current_epoch();
4746        assert!(
4747            db.store()
4748                .get_node_versioned(node_in_tx, epoch, transaction_id)
4749                .is_some(),
4750            "Transaction node should be visible to its own transaction"
4751        );
4752
4753        // Rollback the transaction
4754        session.rollback().unwrap();
4755
4756        // The node created via session.create_node() should be discarded
4757        let count_after = db.node_count();
4758        assert_eq!(
4759            count_after, 1,
4760            "Rollback should discard node created via session.create_node(), but got {count_after}"
4761        );
4762    }
4763
4764    #[test]
4765    fn test_session_create_node_with_props_in_transaction() {
4766        use grafeo_common::types::Value;
4767
4768        // Test that session.create_node_with_props() is transaction-aware
4769        let db = GrafeoDB::new_in_memory();
4770
4771        // Create a node outside of any transaction
4772        db.create_node(&["Person"]);
4773        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4774
4775        // Start a transaction and create a node with properties
4776        let mut session = db.session();
4777        session.begin_transaction().unwrap();
4778        let transaction_id = session.current_transaction.lock().unwrap();
4779
4780        let node_in_tx =
4781            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4782        assert!(node_in_tx.is_valid());
4783
4784        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4785        // non-versioned lookups. Verify the node is visible only to its own tx.
4786        assert_eq!(
4787            db.node_count(),
4788            1,
4789            "PENDING nodes should be invisible to non-versioned node_count()"
4790        );
4791        let epoch = db.store().current_epoch();
4792        assert!(
4793            db.store()
4794                .get_node_versioned(node_in_tx, epoch, transaction_id)
4795                .is_some(),
4796            "Transaction node should be visible to its own transaction"
4797        );
4798
4799        // Rollback the transaction
4800        session.rollback().unwrap();
4801
4802        // The node should be discarded
4803        let count_after = db.node_count();
4804        assert_eq!(
4805            count_after, 1,
4806            "Rollback should discard node created via session.create_node_with_props()"
4807        );
4808    }
4809
4810    #[cfg(feature = "gql")]
4811    mod gql_tests {
4812        use super::*;
4813
4814        #[test]
4815        fn test_gql_query_execution() {
4816            let db = GrafeoDB::new_in_memory();
4817            let session = db.session();
4818
4819            // Create some test data
4820            session.create_node(&["Person"]);
4821            session.create_node(&["Person"]);
4822            session.create_node(&["Animal"]);
4823
4824            // Execute a GQL query
4825            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4826
4827            // Should return 2 Person nodes
4828            assert_eq!(result.row_count(), 2);
4829            assert_eq!(result.column_count(), 1);
4830            assert_eq!(result.columns[0], "n");
4831        }
4832
4833        #[test]
4834        fn test_gql_empty_result() {
4835            let db = GrafeoDB::new_in_memory();
4836            let session = db.session();
4837
4838            // No data in database
4839            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4840
4841            assert_eq!(result.row_count(), 0);
4842        }
4843
4844        #[test]
4845        fn test_gql_parse_error() {
4846            let db = GrafeoDB::new_in_memory();
4847            let session = db.session();
4848
4849            // Invalid GQL syntax
4850            let result = session.execute("MATCH (n RETURN n");
4851
4852            assert!(result.is_err());
4853        }
4854
4855        #[test]
4856        fn test_gql_relationship_traversal() {
4857            let db = GrafeoDB::new_in_memory();
4858            let session = db.session();
4859
4860            // Create a graph: Alix -> Gus, Alix -> Vincent
4861            let alix = session.create_node(&["Person"]);
4862            let gus = session.create_node(&["Person"]);
4863            let vincent = session.create_node(&["Person"]);
4864
4865            session.create_edge(alix, gus, "KNOWS");
4866            session.create_edge(alix, vincent, "KNOWS");
4867
4868            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4869            let result = session
4870                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4871                .unwrap();
4872
4873            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4874            assert_eq!(result.row_count(), 2);
4875            assert_eq!(result.column_count(), 2);
4876            assert_eq!(result.columns[0], "a");
4877            assert_eq!(result.columns[1], "b");
4878        }
4879
4880        #[test]
4881        fn test_gql_relationship_with_type_filter() {
4882            let db = GrafeoDB::new_in_memory();
4883            let session = db.session();
4884
4885            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4886            let alix = session.create_node(&["Person"]);
4887            let gus = session.create_node(&["Person"]);
4888            let vincent = session.create_node(&["Person"]);
4889
4890            session.create_edge(alix, gus, "KNOWS");
4891            session.create_edge(alix, vincent, "WORKS_WITH");
4892
4893            // Query only KNOWS relationships
4894            let result = session
4895                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4896                .unwrap();
4897
4898            // Should return only 1 row (Alix->Gus)
4899            assert_eq!(result.row_count(), 1);
4900        }
4901
4902        #[test]
4903        fn test_gql_semantic_error_undefined_variable() {
4904            let db = GrafeoDB::new_in_memory();
4905            let session = db.session();
4906
4907            // Reference undefined variable 'x' in RETURN
4908            let result = session.execute("MATCH (n:Person) RETURN x");
4909
4910            // Should fail with semantic error
4911            assert!(result.is_err());
4912            let Err(err) = result else {
4913                panic!("Expected error")
4914            };
4915            assert!(
4916                err.to_string().contains("Undefined variable"),
4917                "Expected undefined variable error, got: {}",
4918                err
4919            );
4920        }
4921
4922        #[test]
4923        fn test_gql_where_clause_property_filter() {
4924            use grafeo_common::types::Value;
4925
4926            let db = GrafeoDB::new_in_memory();
4927            let session = db.session();
4928
4929            // Create people with ages
4930            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4931            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4932            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4933
4934            // Query with WHERE clause: age > 30
4935            let result = session
4936                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4937                .unwrap();
4938
4939            // Should return 2 people (ages 35 and 45)
4940            assert_eq!(result.row_count(), 2);
4941        }
4942
4943        #[test]
4944        fn test_gql_where_clause_equality() {
4945            use grafeo_common::types::Value;
4946
4947            let db = GrafeoDB::new_in_memory();
4948            let session = db.session();
4949
4950            // Create people with names
4951            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4952            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4953            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4954
4955            // Query with WHERE clause: name = "Alix"
4956            let result = session
4957                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4958                .unwrap();
4959
4960            // Should return 2 people named Alix
4961            assert_eq!(result.row_count(), 2);
4962        }
4963
4964        #[test]
4965        fn test_gql_return_property_access() {
4966            use grafeo_common::types::Value;
4967
4968            let db = GrafeoDB::new_in_memory();
4969            let session = db.session();
4970
4971            // Create people with names and ages
4972            session.create_node_with_props(
4973                &["Person"],
4974                [
4975                    ("name", Value::String("Alix".into())),
4976                    ("age", Value::Int64(30)),
4977                ],
4978            );
4979            session.create_node_with_props(
4980                &["Person"],
4981                [
4982                    ("name", Value::String("Gus".into())),
4983                    ("age", Value::Int64(25)),
4984                ],
4985            );
4986
4987            // Query returning properties
4988            let result = session
4989                .execute("MATCH (n:Person) RETURN n.name, n.age")
4990                .unwrap();
4991
4992            // Should return 2 rows with name and age columns
4993            assert_eq!(result.row_count(), 2);
4994            assert_eq!(result.column_count(), 2);
4995            assert_eq!(result.columns[0], "n.name");
4996            assert_eq!(result.columns[1], "n.age");
4997
4998            // Check that we get actual values
4999            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5000            assert!(names.contains(&&Value::String("Alix".into())));
5001            assert!(names.contains(&&Value::String("Gus".into())));
5002        }
5003
5004        #[test]
5005        fn test_gql_return_mixed_expressions() {
5006            use grafeo_common::types::Value;
5007
5008            let db = GrafeoDB::new_in_memory();
5009            let session = db.session();
5010
5011            // Create a person
5012            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5013
5014            // Query returning both node and property
5015            let result = session
5016                .execute("MATCH (n:Person) RETURN n, n.name")
5017                .unwrap();
5018
5019            assert_eq!(result.row_count(), 1);
5020            assert_eq!(result.column_count(), 2);
5021            assert_eq!(result.columns[0], "n");
5022            assert_eq!(result.columns[1], "n.name");
5023
5024            // Second column should be the name
5025            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5026        }
5027    }
5028
5029    #[cfg(feature = "cypher")]
5030    mod cypher_tests {
5031        use super::*;
5032
5033        #[test]
5034        fn test_cypher_query_execution() {
5035            let db = GrafeoDB::new_in_memory();
5036            let session = db.session();
5037
5038            // Create some test data
5039            session.create_node(&["Person"]);
5040            session.create_node(&["Person"]);
5041            session.create_node(&["Animal"]);
5042
5043            // Execute a Cypher query
5044            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5045
5046            // Should return 2 Person nodes
5047            assert_eq!(result.row_count(), 2);
5048            assert_eq!(result.column_count(), 1);
5049            assert_eq!(result.columns[0], "n");
5050        }
5051
5052        #[test]
5053        fn test_cypher_empty_result() {
5054            let db = GrafeoDB::new_in_memory();
5055            let session = db.session();
5056
5057            // No data in database
5058            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5059
5060            assert_eq!(result.row_count(), 0);
5061        }
5062
5063        #[test]
5064        fn test_cypher_parse_error() {
5065            let db = GrafeoDB::new_in_memory();
5066            let session = db.session();
5067
5068            // Invalid Cypher syntax
5069            let result = session.execute_cypher("MATCH (n RETURN n");
5070
5071            assert!(result.is_err());
5072        }
5073    }
5074
5075    // ==================== Direct Lookup API Tests ====================
5076
5077    mod direct_lookup_tests {
5078        use super::*;
5079        use grafeo_common::types::Value;
5080
5081        #[test]
5082        fn test_get_node() {
5083            let db = GrafeoDB::new_in_memory();
5084            let session = db.session();
5085
5086            let id = session.create_node(&["Person"]);
5087            let node = session.get_node(id);
5088
5089            assert!(node.is_some());
5090            let node = node.unwrap();
5091            assert_eq!(node.id, id);
5092        }
5093
5094        #[test]
5095        fn test_get_node_not_found() {
5096            use grafeo_common::types::NodeId;
5097
5098            let db = GrafeoDB::new_in_memory();
5099            let session = db.session();
5100
5101            // Try to get a non-existent node
5102            let node = session.get_node(NodeId::new(9999));
5103            assert!(node.is_none());
5104        }
5105
5106        #[test]
5107        fn test_get_node_property() {
5108            let db = GrafeoDB::new_in_memory();
5109            let session = db.session();
5110
5111            let id = session
5112                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5113
5114            let name = session.get_node_property(id, "name");
5115            assert_eq!(name, Some(Value::String("Alix".into())));
5116
5117            // Non-existent property
5118            let missing = session.get_node_property(id, "missing");
5119            assert!(missing.is_none());
5120        }
5121
5122        #[test]
5123        fn test_get_edge() {
5124            let db = GrafeoDB::new_in_memory();
5125            let session = db.session();
5126
5127            let alix = session.create_node(&["Person"]);
5128            let gus = session.create_node(&["Person"]);
5129            let edge_id = session.create_edge(alix, gus, "KNOWS");
5130
5131            let edge = session.get_edge(edge_id);
5132            assert!(edge.is_some());
5133            let edge = edge.unwrap();
5134            assert_eq!(edge.id, edge_id);
5135            assert_eq!(edge.src, alix);
5136            assert_eq!(edge.dst, gus);
5137        }
5138
5139        #[test]
5140        fn test_get_edge_not_found() {
5141            use grafeo_common::types::EdgeId;
5142
5143            let db = GrafeoDB::new_in_memory();
5144            let session = db.session();
5145
5146            let edge = session.get_edge(EdgeId::new(9999));
5147            assert!(edge.is_none());
5148        }
5149
5150        #[test]
5151        fn test_get_neighbors_outgoing() {
5152            let db = GrafeoDB::new_in_memory();
5153            let session = db.session();
5154
5155            let alix = session.create_node(&["Person"]);
5156            let gus = session.create_node(&["Person"]);
5157            let harm = session.create_node(&["Person"]);
5158
5159            session.create_edge(alix, gus, "KNOWS");
5160            session.create_edge(alix, harm, "KNOWS");
5161
5162            let neighbors = session.get_neighbors_outgoing(alix);
5163            assert_eq!(neighbors.len(), 2);
5164
5165            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5166            assert!(neighbor_ids.contains(&gus));
5167            assert!(neighbor_ids.contains(&harm));
5168        }
5169
5170        #[test]
5171        fn test_get_neighbors_incoming() {
5172            let db = GrafeoDB::new_in_memory();
5173            let session = db.session();
5174
5175            let alix = session.create_node(&["Person"]);
5176            let gus = session.create_node(&["Person"]);
5177            let harm = session.create_node(&["Person"]);
5178
5179            session.create_edge(gus, alix, "KNOWS");
5180            session.create_edge(harm, alix, "KNOWS");
5181
5182            let neighbors = session.get_neighbors_incoming(alix);
5183            assert_eq!(neighbors.len(), 2);
5184
5185            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5186            assert!(neighbor_ids.contains(&gus));
5187            assert!(neighbor_ids.contains(&harm));
5188        }
5189
5190        #[test]
5191        fn test_get_neighbors_outgoing_by_type() {
5192            let db = GrafeoDB::new_in_memory();
5193            let session = db.session();
5194
5195            let alix = session.create_node(&["Person"]);
5196            let gus = session.create_node(&["Person"]);
5197            let company = session.create_node(&["Company"]);
5198
5199            session.create_edge(alix, gus, "KNOWS");
5200            session.create_edge(alix, company, "WORKS_AT");
5201
5202            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5203            assert_eq!(knows_neighbors.len(), 1);
5204            assert_eq!(knows_neighbors[0].0, gus);
5205
5206            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5207            assert_eq!(works_neighbors.len(), 1);
5208            assert_eq!(works_neighbors[0].0, company);
5209
5210            // No edges of this type
5211            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5212            assert!(no_neighbors.is_empty());
5213        }
5214
5215        #[test]
5216        fn test_node_exists() {
5217            use grafeo_common::types::NodeId;
5218
5219            let db = GrafeoDB::new_in_memory();
5220            let session = db.session();
5221
5222            let id = session.create_node(&["Person"]);
5223
5224            assert!(session.node_exists(id));
5225            assert!(!session.node_exists(NodeId::new(9999)));
5226        }
5227
5228        #[test]
5229        fn test_edge_exists() {
5230            use grafeo_common::types::EdgeId;
5231
5232            let db = GrafeoDB::new_in_memory();
5233            let session = db.session();
5234
5235            let alix = session.create_node(&["Person"]);
5236            let gus = session.create_node(&["Person"]);
5237            let edge_id = session.create_edge(alix, gus, "KNOWS");
5238
5239            assert!(session.edge_exists(edge_id));
5240            assert!(!session.edge_exists(EdgeId::new(9999)));
5241        }
5242
5243        #[test]
5244        fn test_get_degree() {
5245            let db = GrafeoDB::new_in_memory();
5246            let session = db.session();
5247
5248            let alix = session.create_node(&["Person"]);
5249            let gus = session.create_node(&["Person"]);
5250            let harm = session.create_node(&["Person"]);
5251
5252            // Alix knows Gus and Harm (2 outgoing)
5253            session.create_edge(alix, gus, "KNOWS");
5254            session.create_edge(alix, harm, "KNOWS");
5255            // Gus knows Alix (1 incoming for Alix)
5256            session.create_edge(gus, alix, "KNOWS");
5257
5258            let (out_degree, in_degree) = session.get_degree(alix);
5259            assert_eq!(out_degree, 2);
5260            assert_eq!(in_degree, 1);
5261
5262            // Node with no edges
5263            let lonely = session.create_node(&["Person"]);
5264            let (out, in_deg) = session.get_degree(lonely);
5265            assert_eq!(out, 0);
5266            assert_eq!(in_deg, 0);
5267        }
5268
5269        #[test]
5270        fn test_get_nodes_batch() {
5271            let db = GrafeoDB::new_in_memory();
5272            let session = db.session();
5273
5274            let alix = session.create_node(&["Person"]);
5275            let gus = session.create_node(&["Person"]);
5276            let harm = session.create_node(&["Person"]);
5277
5278            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5279            assert_eq!(nodes.len(), 3);
5280            assert!(nodes[0].is_some());
5281            assert!(nodes[1].is_some());
5282            assert!(nodes[2].is_some());
5283
5284            // With non-existent node
5285            use grafeo_common::types::NodeId;
5286            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5287            assert_eq!(nodes_with_missing.len(), 3);
5288            assert!(nodes_with_missing[0].is_some());
5289            assert!(nodes_with_missing[1].is_none()); // Missing node
5290            assert!(nodes_with_missing[2].is_some());
5291        }
5292
5293        #[test]
5294        fn test_auto_commit_setting() {
5295            let db = GrafeoDB::new_in_memory();
5296            let mut session = db.session();
5297
5298            // Default is auto-commit enabled
5299            assert!(session.auto_commit());
5300
5301            session.set_auto_commit(false);
5302            assert!(!session.auto_commit());
5303
5304            session.set_auto_commit(true);
5305            assert!(session.auto_commit());
5306        }
5307
5308        #[test]
5309        fn test_transaction_double_begin_nests() {
5310            let db = GrafeoDB::new_in_memory();
5311            let mut session = db.session();
5312
5313            session.begin_transaction().unwrap();
5314            // Second begin_transaction creates a nested transaction (auto-savepoint)
5315            let result = session.begin_transaction();
5316            assert!(result.is_ok());
5317            // Commit the inner (releases savepoint)
5318            session.commit().unwrap();
5319            // Commit the outer
5320            session.commit().unwrap();
5321        }
5322
5323        #[test]
5324        fn test_commit_without_transaction_error() {
5325            let db = GrafeoDB::new_in_memory();
5326            let mut session = db.session();
5327
5328            let result = session.commit();
5329            assert!(result.is_err());
5330        }
5331
5332        #[test]
5333        fn test_rollback_without_transaction_error() {
5334            let db = GrafeoDB::new_in_memory();
5335            let mut session = db.session();
5336
5337            let result = session.rollback();
5338            assert!(result.is_err());
5339        }
5340
5341        #[test]
5342        fn test_create_edge_in_transaction() {
5343            let db = GrafeoDB::new_in_memory();
5344            let mut session = db.session();
5345
5346            // Create nodes outside transaction
5347            let alix = session.create_node(&["Person"]);
5348            let gus = session.create_node(&["Person"]);
5349
5350            // Create edge in transaction
5351            session.begin_transaction().unwrap();
5352            let edge_id = session.create_edge(alix, gus, "KNOWS");
5353
5354            // Edge should be visible in the transaction
5355            assert!(session.edge_exists(edge_id));
5356
5357            // Commit
5358            session.commit().unwrap();
5359
5360            // Edge should still be visible
5361            assert!(session.edge_exists(edge_id));
5362        }
5363
5364        #[test]
5365        fn test_neighbors_empty_node() {
5366            let db = GrafeoDB::new_in_memory();
5367            let session = db.session();
5368
5369            let lonely = session.create_node(&["Person"]);
5370
5371            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5372            assert!(session.get_neighbors_incoming(lonely).is_empty());
5373            assert!(
5374                session
5375                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5376                    .is_empty()
5377            );
5378        }
5379    }
5380
5381    #[test]
5382    fn test_auto_gc_triggers_on_commit_interval() {
5383        use crate::config::Config;
5384
5385        let config = Config::in_memory().with_gc_interval(2);
5386        let db = GrafeoDB::with_config(config).unwrap();
5387        let mut session = db.session();
5388
5389        // First commit: counter = 1, no GC (not a multiple of 2)
5390        session.begin_transaction().unwrap();
5391        session.create_node(&["A"]);
5392        session.commit().unwrap();
5393
5394        // Second commit: counter = 2, GC should trigger (multiple of 2)
5395        session.begin_transaction().unwrap();
5396        session.create_node(&["B"]);
5397        session.commit().unwrap();
5398
5399        // Verify the database is still functional after GC
5400        assert_eq!(db.node_count(), 2);
5401    }
5402
5403    #[test]
5404    fn test_query_timeout_config_propagates_to_session() {
5405        use crate::config::Config;
5406        use std::time::Duration;
5407
5408        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5409        let db = GrafeoDB::with_config(config).unwrap();
5410        let session = db.session();
5411
5412        // Verify the session has a query deadline (timeout was set)
5413        assert!(session.query_deadline().is_some());
5414    }
5415
5416    #[test]
5417    fn test_no_query_timeout_returns_no_deadline() {
5418        let db = GrafeoDB::new_in_memory();
5419        let session = db.session();
5420
5421        // Default config has no timeout
5422        assert!(session.query_deadline().is_none());
5423    }
5424
5425    #[test]
5426    fn test_graph_model_accessor() {
5427        use crate::config::GraphModel;
5428
5429        let db = GrafeoDB::new_in_memory();
5430        let session = db.session();
5431
5432        assert_eq!(session.graph_model(), GraphModel::Lpg);
5433    }
5434
5435    #[cfg(feature = "gql")]
5436    #[test]
5437    fn test_external_store_session() {
5438        use grafeo_core::graph::GraphStoreMut;
5439        use std::sync::Arc;
5440
5441        let config = crate::config::Config::in_memory();
5442        let store =
5443            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5444        let db = GrafeoDB::with_store(store, config).unwrap();
5445
5446        let mut session = db.session();
5447
5448        // Use an explicit transaction so that INSERT and MATCH share the same
5449        // transaction context. With PENDING epochs, uncommitted versions are
5450        // only visible to the owning transaction.
5451        session.begin_transaction().unwrap();
5452        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5453
5454        // Verify we can query through it within the same transaction
5455        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5456        assert_eq!(result.row_count(), 1);
5457
5458        session.commit().unwrap();
5459    }
5460
5461    // ==================== Session Command Tests ====================
5462
5463    #[cfg(feature = "gql")]
5464    mod session_command_tests {
5465        use super::*;
5466        use grafeo_common::types::Value;
5467
5468        #[test]
5469        fn test_use_graph_sets_current_graph() {
5470            let db = GrafeoDB::new_in_memory();
5471            let session = db.session();
5472
5473            // Create the graph first, then USE it
5474            session.execute("CREATE GRAPH mydb").unwrap();
5475            session.execute("USE GRAPH mydb").unwrap();
5476
5477            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5478        }
5479
5480        #[test]
5481        fn test_use_graph_nonexistent_errors() {
5482            let db = GrafeoDB::new_in_memory();
5483            let session = db.session();
5484
5485            let result = session.execute("USE GRAPH doesnotexist");
5486            assert!(result.is_err());
5487            let err = result.unwrap_err().to_string();
5488            assert!(
5489                err.contains("does not exist"),
5490                "Expected 'does not exist' error, got: {err}"
5491            );
5492        }
5493
5494        #[test]
5495        fn test_use_graph_default_always_valid() {
5496            let db = GrafeoDB::new_in_memory();
5497            let session = db.session();
5498
5499            // "default" is always valid, even without CREATE GRAPH
5500            session.execute("USE GRAPH default").unwrap();
5501            assert_eq!(session.current_graph(), Some("default".to_string()));
5502        }
5503
5504        #[test]
5505        fn test_session_set_graph() {
5506            let db = GrafeoDB::new_in_memory();
5507            let session = db.session();
5508
5509            session.execute("CREATE GRAPH analytics").unwrap();
5510            session.execute("SESSION SET GRAPH analytics").unwrap();
5511            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5512        }
5513
5514        #[test]
5515        fn test_session_set_graph_nonexistent_errors() {
5516            let db = GrafeoDB::new_in_memory();
5517            let session = db.session();
5518
5519            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5520            assert!(result.is_err());
5521        }
5522
5523        #[test]
5524        fn test_session_set_time_zone() {
5525            let db = GrafeoDB::new_in_memory();
5526            let session = db.session();
5527
5528            assert_eq!(session.time_zone(), None);
5529
5530            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5531            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5532
5533            session
5534                .execute("SESSION SET TIME ZONE 'America/New_York'")
5535                .unwrap();
5536            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5537        }
5538
5539        #[test]
5540        fn test_session_set_parameter() {
5541            let db = GrafeoDB::new_in_memory();
5542            let session = db.session();
5543
5544            session
5545                .execute("SESSION SET PARAMETER $timeout = 30")
5546                .unwrap();
5547
5548            // Parameter is stored (value is Null for now, since expression
5549            // evaluation is not yet wired up)
5550            assert!(session.get_parameter("timeout").is_some());
5551        }
5552
5553        #[test]
5554        fn test_session_reset_clears_all_state() {
5555            let db = GrafeoDB::new_in_memory();
5556            let session = db.session();
5557
5558            // Set various session state
5559            session.execute("CREATE GRAPH analytics").unwrap();
5560            session.execute("SESSION SET GRAPH analytics").unwrap();
5561            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5562            session
5563                .execute("SESSION SET PARAMETER $limit = 100")
5564                .unwrap();
5565
5566            // Verify state was set
5567            assert!(session.current_graph().is_some());
5568            assert!(session.time_zone().is_some());
5569            assert!(session.get_parameter("limit").is_some());
5570
5571            // Reset everything
5572            session.execute("SESSION RESET").unwrap();
5573
5574            assert_eq!(session.current_graph(), None);
5575            assert_eq!(session.time_zone(), None);
5576            assert!(session.get_parameter("limit").is_none());
5577        }
5578
5579        #[test]
5580        fn test_session_close_clears_state() {
5581            let db = GrafeoDB::new_in_memory();
5582            let session = db.session();
5583
5584            session.execute("CREATE GRAPH analytics").unwrap();
5585            session.execute("SESSION SET GRAPH analytics").unwrap();
5586            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5587
5588            session.execute("SESSION CLOSE").unwrap();
5589
5590            assert_eq!(session.current_graph(), None);
5591            assert_eq!(session.time_zone(), None);
5592        }
5593
5594        #[test]
5595        fn test_create_graph() {
5596            let db = GrafeoDB::new_in_memory();
5597            let session = db.session();
5598
5599            session.execute("CREATE GRAPH mydb").unwrap();
5600
5601            // Should be able to USE it now
5602            session.execute("USE GRAPH mydb").unwrap();
5603            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5604        }
5605
5606        #[test]
5607        fn test_create_graph_duplicate_errors() {
5608            let db = GrafeoDB::new_in_memory();
5609            let session = db.session();
5610
5611            session.execute("CREATE GRAPH mydb").unwrap();
5612            let result = session.execute("CREATE GRAPH mydb");
5613
5614            assert!(result.is_err());
5615            let err = result.unwrap_err().to_string();
5616            assert!(
5617                err.contains("already exists"),
5618                "Expected 'already exists' error, got: {err}"
5619            );
5620        }
5621
5622        #[test]
5623        fn test_create_graph_if_not_exists() {
5624            let db = GrafeoDB::new_in_memory();
5625            let session = db.session();
5626
5627            session.execute("CREATE GRAPH mydb").unwrap();
5628            // Should succeed silently with IF NOT EXISTS
5629            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5630        }
5631
5632        #[test]
5633        fn test_drop_graph() {
5634            let db = GrafeoDB::new_in_memory();
5635            let session = db.session();
5636
5637            session.execute("CREATE GRAPH mydb").unwrap();
5638            session.execute("DROP GRAPH mydb").unwrap();
5639
5640            // Should no longer be usable
5641            let result = session.execute("USE GRAPH mydb");
5642            assert!(result.is_err());
5643        }
5644
5645        #[test]
5646        fn test_drop_graph_nonexistent_errors() {
5647            let db = GrafeoDB::new_in_memory();
5648            let session = db.session();
5649
5650            let result = session.execute("DROP GRAPH nosuchgraph");
5651            assert!(result.is_err());
5652            let err = result.unwrap_err().to_string();
5653            assert!(
5654                err.contains("does not exist"),
5655                "Expected 'does not exist' error, got: {err}"
5656            );
5657        }
5658
5659        #[test]
5660        fn test_drop_graph_if_exists() {
5661            let db = GrafeoDB::new_in_memory();
5662            let session = db.session();
5663
5664            // Should succeed silently with IF EXISTS
5665            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5666        }
5667
5668        #[test]
5669        fn test_start_transaction_via_gql() {
5670            let db = GrafeoDB::new_in_memory();
5671            let session = db.session();
5672
5673            session.execute("START TRANSACTION").unwrap();
5674            assert!(session.in_transaction());
5675            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5676            session.execute("COMMIT").unwrap();
5677            assert!(!session.in_transaction());
5678
5679            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5680            assert_eq!(result.rows.len(), 1);
5681        }
5682
5683        #[test]
5684        fn test_start_transaction_read_only_blocks_insert() {
5685            let db = GrafeoDB::new_in_memory();
5686            let session = db.session();
5687
5688            session.execute("START TRANSACTION READ ONLY").unwrap();
5689            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5690            assert!(result.is_err());
5691            let err = result.unwrap_err().to_string();
5692            assert!(
5693                err.contains("read-only"),
5694                "Expected read-only error, got: {err}"
5695            );
5696            session.execute("ROLLBACK").unwrap();
5697        }
5698
5699        #[test]
5700        fn test_start_transaction_read_only_allows_reads() {
5701            let db = GrafeoDB::new_in_memory();
5702            let mut session = db.session();
5703            session.begin_transaction().unwrap();
5704            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5705            session.commit().unwrap();
5706
5707            session.execute("START TRANSACTION READ ONLY").unwrap();
5708            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5709            assert_eq!(result.rows.len(), 1);
5710            session.execute("COMMIT").unwrap();
5711        }
5712
5713        #[test]
5714        fn test_rollback_via_gql() {
5715            let db = GrafeoDB::new_in_memory();
5716            let session = db.session();
5717
5718            session.execute("START TRANSACTION").unwrap();
5719            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5720            session.execute("ROLLBACK").unwrap();
5721
5722            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5723            assert!(result.rows.is_empty());
5724        }
5725
5726        #[test]
5727        fn test_start_transaction_with_isolation_level() {
5728            let db = GrafeoDB::new_in_memory();
5729            let session = db.session();
5730
5731            session
5732                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5733                .unwrap();
5734            assert!(session.in_transaction());
5735            session.execute("ROLLBACK").unwrap();
5736        }
5737
5738        #[test]
5739        fn test_session_commands_return_empty_result() {
5740            let db = GrafeoDB::new_in_memory();
5741            let session = db.session();
5742
5743            session.execute("CREATE GRAPH test").unwrap();
5744            let result = session.execute("SESSION SET GRAPH test").unwrap();
5745            assert_eq!(result.row_count(), 0);
5746            assert_eq!(result.column_count(), 0);
5747        }
5748
5749        #[test]
5750        fn test_current_graph_default_is_none() {
5751            let db = GrafeoDB::new_in_memory();
5752            let session = db.session();
5753
5754            assert_eq!(session.current_graph(), None);
5755        }
5756
5757        #[test]
5758        fn test_time_zone_default_is_none() {
5759            let db = GrafeoDB::new_in_memory();
5760            let session = db.session();
5761
5762            assert_eq!(session.time_zone(), None);
5763        }
5764
5765        #[test]
5766        fn test_session_state_independent_across_sessions() {
5767            let db = GrafeoDB::new_in_memory();
5768            let session1 = db.session();
5769            let session2 = db.session();
5770
5771            session1.execute("CREATE GRAPH first").unwrap();
5772            session1.execute("CREATE GRAPH second").unwrap();
5773            session1.execute("SESSION SET GRAPH first").unwrap();
5774            session2.execute("SESSION SET GRAPH second").unwrap();
5775
5776            assert_eq!(session1.current_graph(), Some("first".to_string()));
5777            assert_eq!(session2.current_graph(), Some("second".to_string()));
5778        }
5779
5780        #[test]
5781        fn test_show_node_types() {
5782            let db = GrafeoDB::new_in_memory();
5783            let session = db.session();
5784
5785            session
5786                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5787                .unwrap();
5788
5789            let result = session.execute("SHOW NODE TYPES").unwrap();
5790            assert_eq!(
5791                result.columns,
5792                vec!["name", "properties", "constraints", "parents"]
5793            );
5794            assert_eq!(result.rows.len(), 1);
5795            // First column is the type name
5796            assert_eq!(result.rows[0][0], Value::from("Person"));
5797        }
5798
5799        #[test]
5800        fn test_show_edge_types() {
5801            let db = GrafeoDB::new_in_memory();
5802            let session = db.session();
5803
5804            session
5805                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5806                .unwrap();
5807
5808            let result = session.execute("SHOW EDGE TYPES").unwrap();
5809            assert_eq!(
5810                result.columns,
5811                vec!["name", "properties", "source_types", "target_types"]
5812            );
5813            assert_eq!(result.rows.len(), 1);
5814            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5815        }
5816
5817        #[test]
5818        fn test_show_graph_types() {
5819            let db = GrafeoDB::new_in_memory();
5820            let session = db.session();
5821
5822            session
5823                .execute("CREATE NODE TYPE Person (name STRING)")
5824                .unwrap();
5825            session
5826                .execute(
5827                    "CREATE GRAPH TYPE social (\
5828                        NODE TYPE Person (name STRING)\
5829                    )",
5830                )
5831                .unwrap();
5832
5833            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5834            assert_eq!(
5835                result.columns,
5836                vec!["name", "open", "node_types", "edge_types"]
5837            );
5838            assert_eq!(result.rows.len(), 1);
5839            assert_eq!(result.rows[0][0], Value::from("social"));
5840        }
5841
5842        #[test]
5843        fn test_show_graph_type_named() {
5844            let db = GrafeoDB::new_in_memory();
5845            let session = db.session();
5846
5847            session
5848                .execute("CREATE NODE TYPE Person (name STRING)")
5849                .unwrap();
5850            session
5851                .execute(
5852                    "CREATE GRAPH TYPE social (\
5853                        NODE TYPE Person (name STRING)\
5854                    )",
5855                )
5856                .unwrap();
5857
5858            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5859            assert_eq!(result.rows.len(), 1);
5860            assert_eq!(result.rows[0][0], Value::from("social"));
5861        }
5862
5863        #[test]
5864        fn test_show_graph_type_not_found() {
5865            let db = GrafeoDB::new_in_memory();
5866            let session = db.session();
5867
5868            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5869            assert!(result.is_err());
5870        }
5871
5872        #[test]
5873        fn test_show_indexes_via_gql() {
5874            let db = GrafeoDB::new_in_memory();
5875            let session = db.session();
5876
5877            let result = session.execute("SHOW INDEXES").unwrap();
5878            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5879        }
5880
5881        #[test]
5882        fn test_show_constraints_via_gql() {
5883            let db = GrafeoDB::new_in_memory();
5884            let session = db.session();
5885
5886            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5887            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5888        }
5889
5890        #[test]
5891        fn test_pattern_form_graph_type_roundtrip() {
5892            let db = GrafeoDB::new_in_memory();
5893            let session = db.session();
5894
5895            // Register the types first
5896            session
5897                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5898                .unwrap();
5899            session
5900                .execute("CREATE NODE TYPE City (name STRING)")
5901                .unwrap();
5902            session
5903                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5904                .unwrap();
5905            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5906
5907            // Create graph type using pattern form
5908            session
5909                .execute(
5910                    "CREATE GRAPH TYPE social (\
5911                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5912                        (:Person)-[:LIVES_IN]->(:City)\
5913                    )",
5914                )
5915                .unwrap();
5916
5917            // Verify it was created
5918            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5919            assert_eq!(result.rows.len(), 1);
5920            assert_eq!(result.rows[0][0], Value::from("social"));
5921        }
5922    }
5923}