Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29/// Parses a DDL default-value literal string into a [`Value`].
30///
31/// Handles string literals (single- or double-quoted), integers, floats,
32/// booleans (`true`/`false`), and `NULL`.
33fn parse_default_literal(text: &str) -> Value {
34    if text.eq_ignore_ascii_case("null") {
35        return Value::Null;
36    }
37    if text.eq_ignore_ascii_case("true") {
38        return Value::Bool(true);
39    }
40    if text.eq_ignore_ascii_case("false") {
41        return Value::Bool(false);
42    }
43    // String literal: strip surrounding quotes
44    if (text.starts_with('\'') && text.ends_with('\''))
45        || (text.starts_with('"') && text.ends_with('"'))
46    {
47        return Value::String(text[1..text.len() - 1].into());
48    }
49    // Try integer, then float
50    if let Ok(i) = text.parse::<i64>() {
51        return Value::Int64(i);
52    }
53    if let Ok(f) = text.parse::<f64>() {
54        return Value::Float64(f);
55    }
56    // Fallback: treat as string
57    Value::String(text.into())
58}
59
60/// Runtime configuration for creating a new session.
61///
62/// Groups the shared parameters passed to all session constructors, keeping
63/// call sites readable and avoiding long argument lists.
64pub(crate) struct SessionConfig {
65    pub transaction_manager: Arc<TransactionManager>,
66    pub query_cache: Arc<QueryCache>,
67    pub catalog: Arc<Catalog>,
68    pub adaptive_config: AdaptiveConfig,
69    pub factorized_execution: bool,
70    pub graph_model: GraphModel,
71    pub query_timeout: Option<Duration>,
72    pub commit_counter: Arc<AtomicUsize>,
73    pub gc_interval: usize,
74    /// When true, the session permanently blocks all mutations.
75    pub read_only: bool,
76}
77
78/// Your handle to the database - execute queries and manage transactions.
79///
80/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
81/// tracks its own transaction state, so you can have multiple concurrent
82/// sessions without them interfering.
83pub struct Session {
84    /// The underlying store.
85    store: Arc<LpgStore>,
86    /// Graph store trait object for pluggable storage backends (read path).
87    graph_store: Arc<dyn GraphStore>,
88    /// Writable graph store (None for read-only databases).
89    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
90    /// Schema and metadata catalog shared across sessions.
91    catalog: Arc<Catalog>,
92    /// RDF triple store (if RDF feature is enabled).
93    #[cfg(feature = "rdf")]
94    rdf_store: Arc<RdfStore>,
95    /// Transaction manager.
96    transaction_manager: Arc<TransactionManager>,
97    /// Query cache shared across sessions.
98    query_cache: Arc<QueryCache>,
99    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
100    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
101    /// from within `execute(&self)`.
102    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
103    /// Whether the current transaction is read-only (blocks mutations).
104    read_only_tx: parking_lot::Mutex<bool>,
105    /// Whether the database itself is read-only (set at open time, never changes).
106    /// When true, `read_only_tx` is always true regardless of transaction flags.
107    db_read_only: bool,
108    /// Whether the session is in auto-commit mode.
109    auto_commit: bool,
110    /// Adaptive execution configuration.
111    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
112    adaptive_config: AdaptiveConfig,
113    /// Whether to use factorized execution for multi-hop queries.
114    factorized_execution: bool,
115    /// The graph data model this session operates on.
116    graph_model: GraphModel,
117    /// Maximum time a query may run before being cancelled.
118    query_timeout: Option<Duration>,
119    /// Shared commit counter for triggering auto-GC.
120    commit_counter: Arc<AtomicUsize>,
121    /// GC every N commits (0 = disabled).
122    gc_interval: usize,
123    /// Node count at the start of the current transaction (for PreparedCommit stats).
124    transaction_start_node_count: AtomicUsize,
125    /// Edge count at the start of the current transaction (for PreparedCommit stats).
126    transaction_start_edge_count: AtomicUsize,
127    /// WAL for logging schema changes.
128    #[cfg(feature = "wal")]
129    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
130    /// Shared WAL graph context tracker for named graph awareness.
131    #[cfg(feature = "wal")]
132    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
133    /// CDC log for change tracking.
134    #[cfg(feature = "cdc")]
135    cdc_log: Arc<crate::cdc::CdcLog>,
136    /// Buffered CDC events for the current transaction.
137    /// Flushed to `cdc_log` on commit, discarded on rollback.
138    #[cfg(feature = "cdc")]
139    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
140    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
141    current_graph: parking_lot::Mutex<Option<String>>,
142    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
143    /// None = "not set" (uses default schema).
144    current_schema: parking_lot::Mutex<Option<String>>,
145    /// Session time zone override.
146    time_zone: parking_lot::Mutex<Option<String>>,
147    /// Session-level parameters (SET PARAMETER).
148    session_params:
149        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
150    /// Override epoch for time-travel queries (None = use transaction/current epoch).
151    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
152    /// Savepoints within the current transaction.
153    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
154    /// Nesting depth for nested transactions (0 = outermost).
155    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
156    /// releases it, nested `ROLLBACK` rolls back to it.
157    transaction_nesting_depth: parking_lot::Mutex<u32>,
158    /// Named graphs touched during the current transaction (for cross-graph atomicity).
159    /// `None` represents the default graph. Populated at `BEGIN` time and on each
160    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
161    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
162    /// Shared metrics registry (populated when the `metrics` feature is enabled).
163    #[cfg(feature = "metrics")]
164    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
165    /// Transaction start time for duration tracking.
166    #[cfg(feature = "metrics")]
167    tx_start_time: parking_lot::Mutex<Option<Instant>>,
168}
169
170/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
171#[derive(Clone)]
172struct GraphSavepoint {
173    graph_name: Option<String>,
174    next_node_id: u64,
175    next_edge_id: u64,
176    undo_log_position: usize,
177}
178
179/// Savepoint state: name + per-graph snapshots + the graph that was active.
180#[derive(Clone)]
181struct SavepointState {
182    name: String,
183    graph_snapshots: Vec<GraphSavepoint>,
184    /// The graph that was active when the savepoint was created.
185    /// Reserved for future use (e.g., restoring graph context on rollback).
186    #[allow(dead_code)]
187    active_graph: Option<String>,
188    /// CDC event buffer position at savepoint creation.
189    /// On rollback-to-savepoint, the buffer is truncated to this position.
190    #[cfg(feature = "cdc")]
191    cdc_event_position: usize,
192}
193
194impl Session {
195    /// Creates a new session with adaptive execution configuration.
196    #[allow(dead_code)]
197    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
198        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
199        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
200        Self {
201            store,
202            graph_store,
203            graph_store_mut,
204            catalog: cfg.catalog,
205            #[cfg(feature = "rdf")]
206            rdf_store: Arc::new(RdfStore::new()),
207            transaction_manager: cfg.transaction_manager,
208            query_cache: cfg.query_cache,
209            current_transaction: parking_lot::Mutex::new(None),
210            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
211            db_read_only: cfg.read_only,
212            auto_commit: true,
213            adaptive_config: cfg.adaptive_config,
214            factorized_execution: cfg.factorized_execution,
215            graph_model: cfg.graph_model,
216            query_timeout: cfg.query_timeout,
217            commit_counter: cfg.commit_counter,
218            gc_interval: cfg.gc_interval,
219            transaction_start_node_count: AtomicUsize::new(0),
220            transaction_start_edge_count: AtomicUsize::new(0),
221            #[cfg(feature = "wal")]
222            wal: None,
223            #[cfg(feature = "wal")]
224            wal_graph_context: None,
225            #[cfg(feature = "cdc")]
226            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
227            #[cfg(feature = "cdc")]
228            cdc_pending_events: None,
229            current_graph: parking_lot::Mutex::new(None),
230            current_schema: parking_lot::Mutex::new(None),
231            time_zone: parking_lot::Mutex::new(None),
232            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
233            viewing_epoch_override: parking_lot::Mutex::new(None),
234            savepoints: parking_lot::Mutex::new(Vec::new()),
235            transaction_nesting_depth: parking_lot::Mutex::new(0),
236            touched_graphs: parking_lot::Mutex::new(Vec::new()),
237            #[cfg(feature = "metrics")]
238            metrics: None,
239            #[cfg(feature = "metrics")]
240            tx_start_time: parking_lot::Mutex::new(None),
241        }
242    }
243
244    /// Sets the WAL for this session (shared with the database).
245    ///
246    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
247    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
248    #[cfg(feature = "wal")]
249    pub(crate) fn set_wal(
250        &mut self,
251        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
252        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
253    ) {
254        // Wrap the graph store so query-engine mutations are WAL-logged
255        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
256            Arc::clone(&self.store),
257            Arc::clone(&wal),
258            Arc::clone(&wal_graph_context),
259        ));
260        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
261        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
262        self.wal = Some(wal);
263        self.wal_graph_context = Some(wal_graph_context);
264    }
265
266    /// Sets the CDC log for this session (shared with the database).
267    ///
268    /// Wraps the current write store with a `CdcGraphStore` decorator so
269    /// that all session mutations (INSERT, SET, DELETE via query execution)
270    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
271    /// and discarded on rollback.
272    #[cfg(feature = "cdc")]
273    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
274        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
275        // The read store (self.graph_store) is left unchanged for zero read overhead.
276        if let Some(ref write_store) = self.graph_store_mut {
277            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
278                Arc::clone(write_store),
279                Arc::clone(&cdc_log),
280            ));
281            self.cdc_pending_events = Some(cdc_store.pending_events());
282            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
283        }
284        self.cdc_log = cdc_log;
285    }
286
287    /// Sets the metrics registry for this session (shared with the database).
288    #[cfg(feature = "metrics")]
289    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
290        self.metrics = Some(metrics);
291    }
292
293    /// Creates a session backed by an external graph store.
294    ///
295    /// The external store handles all data operations. Transaction management
296    /// (begin/commit/rollback) is not supported for external stores.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if the internal arena allocation fails (out of memory).
301    pub(crate) fn with_external_store(
302        read_store: Arc<dyn GraphStore>,
303        write_store: Option<Arc<dyn GraphStoreMut>>,
304        cfg: SessionConfig,
305    ) -> Result<Self> {
306        Ok(Self {
307            store: Arc::new(LpgStore::new()?),
308            graph_store: read_store,
309            graph_store_mut: write_store,
310            catalog: cfg.catalog,
311            #[cfg(feature = "rdf")]
312            rdf_store: Arc::new(RdfStore::new()),
313            transaction_manager: cfg.transaction_manager,
314            query_cache: cfg.query_cache,
315            current_transaction: parking_lot::Mutex::new(None),
316            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
317            db_read_only: cfg.read_only,
318            auto_commit: true,
319            adaptive_config: cfg.adaptive_config,
320            factorized_execution: cfg.factorized_execution,
321            graph_model: cfg.graph_model,
322            query_timeout: cfg.query_timeout,
323            commit_counter: cfg.commit_counter,
324            gc_interval: cfg.gc_interval,
325            transaction_start_node_count: AtomicUsize::new(0),
326            transaction_start_edge_count: AtomicUsize::new(0),
327            #[cfg(feature = "wal")]
328            wal: None,
329            #[cfg(feature = "wal")]
330            wal_graph_context: None,
331            #[cfg(feature = "cdc")]
332            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
333            #[cfg(feature = "cdc")]
334            cdc_pending_events: None,
335            current_graph: parking_lot::Mutex::new(None),
336            current_schema: parking_lot::Mutex::new(None),
337            time_zone: parking_lot::Mutex::new(None),
338            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
339            viewing_epoch_override: parking_lot::Mutex::new(None),
340            savepoints: parking_lot::Mutex::new(Vec::new()),
341            transaction_nesting_depth: parking_lot::Mutex::new(0),
342            touched_graphs: parking_lot::Mutex::new(Vec::new()),
343            #[cfg(feature = "metrics")]
344            metrics: None,
345            #[cfg(feature = "metrics")]
346            tx_start_time: parking_lot::Mutex::new(None),
347        })
348    }
349
350    /// Returns the graph model this session operates on.
351    #[must_use]
352    pub fn graph_model(&self) -> GraphModel {
353        self.graph_model
354    }
355
356    // === Session State Management ===
357
358    /// Sets the current graph for this session (USE GRAPH).
359    pub fn use_graph(&self, name: &str) {
360        *self.current_graph.lock() = Some(name.to_string());
361    }
362
363    /// Returns the current graph name, if any.
364    #[must_use]
365    pub fn current_graph(&self) -> Option<String> {
366        self.current_graph.lock().clone()
367    }
368
369    /// Sets the current schema for this session (SESSION SET SCHEMA).
370    ///
371    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
372    pub fn set_schema(&self, name: &str) {
373        *self.current_schema.lock() = Some(name.to_string());
374    }
375
376    /// Returns the current schema name, if any.
377    ///
378    /// `None` means "not set", which resolves to the default schema.
379    #[must_use]
380    pub fn current_schema(&self) -> Option<String> {
381        self.current_schema.lock().clone()
382    }
383
384    /// Computes the effective storage key for a graph, accounting for schema context.
385    ///
386    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
387    /// Uses `/` as separator since it is invalid in GQL identifiers.
388    fn effective_graph_key(&self, graph_name: &str) -> String {
389        let schema = self.current_schema.lock().clone();
390        match schema {
391            Some(s) => format!("{s}/{graph_name}"),
392            None => graph_name.to_string(),
393        }
394    }
395
396    /// Computes the effective storage key for a type, accounting for schema context.
397    ///
398    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
399    fn effective_type_key(&self, type_name: &str) -> String {
400        let schema = self.current_schema.lock().clone();
401        match schema {
402            Some(s) => format!("{s}/{type_name}"),
403            None => type_name.to_string(),
404        }
405    }
406
407    /// Returns the effective storage key for the current graph, accounting for schema.
408    ///
409    /// Combines `current_schema` and `current_graph` into a flat lookup key.
410    fn active_graph_storage_key(&self) -> Option<String> {
411        let graph = self.current_graph.lock().clone();
412        let schema = self.current_schema.lock().clone();
413        match (schema, graph) {
414            (_, None) => None,
415            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
416            (None, Some(name)) => Some(name),
417            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
418        }
419    }
420
421    /// Returns the graph store for the currently active graph.
422    ///
423    /// If `current_graph` is `None` or `"default"`, returns the session's
424    /// default `graph_store` (already WAL-wrapped for the default graph).
425    /// Otherwise looks up the named graph in the root store and wraps it
426    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
427    /// graph context.
428    fn active_store(&self) -> Arc<dyn GraphStore> {
429        let key = self.active_graph_storage_key();
430        match key {
431            None => Arc::clone(&self.graph_store),
432            Some(ref name) => match self.store.graph(name) {
433                Some(named_store) => {
434                    #[cfg(feature = "wal")]
435                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
436                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
437                            named_store,
438                            Arc::clone(wal),
439                            name.clone(),
440                            Arc::clone(ctx),
441                        )) as Arc<dyn GraphStore>;
442                    }
443                    named_store as Arc<dyn GraphStore>
444                }
445                None => Arc::clone(&self.graph_store),
446            },
447        }
448    }
449
450    /// Returns the writable store for the active graph, if available.
451    ///
452    /// Returns `None` for read-only databases. For named graphs, wraps
453    /// the store with WAL logging when durability is enabled.
454    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
455        let key = self.active_graph_storage_key();
456        match key {
457            None => self.graph_store_mut.as_ref().map(Arc::clone),
458            Some(ref name) => match self.store.graph(name) {
459                Some(named_store) => {
460                    let mut store: Arc<dyn GraphStoreMut> = named_store;
461
462                    #[cfg(feature = "wal")]
463                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
464                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
465                            // WAL needs Arc<LpgStore>, get it fresh
466                            self.store
467                                .graph(name)
468                                .unwrap_or_else(|| Arc::clone(&self.store)),
469                            Arc::clone(wal),
470                            name.clone(),
471                            Arc::clone(ctx),
472                        ));
473                    }
474
475                    #[cfg(feature = "cdc")]
476                    if let Some(ref pending) = self.cdc_pending_events {
477                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
478                            store,
479                            Arc::clone(&self.cdc_log),
480                            Arc::clone(pending),
481                        ));
482                    }
483
484                    Some(store)
485                }
486                None => self.graph_store_mut.as_ref().map(Arc::clone),
487            },
488        }
489    }
490
491    /// Returns the concrete `LpgStore` for the currently active graph.
492    ///
493    /// Used by direct CRUD methods that need the concrete store type
494    /// for versioned operations.
495    fn active_lpg_store(&self) -> Arc<LpgStore> {
496        let key = self.active_graph_storage_key();
497        match key {
498            None => Arc::clone(&self.store),
499            Some(ref name) => self
500                .store
501                .graph(name)
502                .unwrap_or_else(|| Arc::clone(&self.store)),
503        }
504    }
505
506    /// Resolves a graph name to a concrete `LpgStore`.
507    /// `None` and `"default"` resolve to the session's root store.
508    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
509        match graph_name {
510            None => Arc::clone(&self.store),
511            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
512            Some(name) => self
513                .store
514                .graph(name)
515                .unwrap_or_else(|| Arc::clone(&self.store)),
516        }
517    }
518
519    /// Records the current graph as "touched" if a transaction is active.
520    ///
521    /// Uses the full storage key (schema/graph) so that commit/rollback
522    /// can resolve the correct store via `resolve_store`.
523    fn track_graph_touch(&self) {
524        if self.current_transaction.lock().is_some() {
525            let key = self.active_graph_storage_key();
526            let mut touched = self.touched_graphs.lock();
527            if !touched.contains(&key) {
528                touched.push(key);
529            }
530        }
531    }
532
533    /// Sets the session time zone.
534    pub fn set_time_zone(&self, tz: &str) {
535        *self.time_zone.lock() = Some(tz.to_string());
536    }
537
538    /// Returns the session time zone, if set.
539    #[must_use]
540    pub fn time_zone(&self) -> Option<String> {
541        self.time_zone.lock().clone()
542    }
543
544    /// Sets a session parameter.
545    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
546        self.session_params.lock().insert(key.to_string(), value);
547    }
548
549    /// Gets a session parameter by cloning it.
550    #[must_use]
551    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
552        self.session_params.lock().get(key).cloned()
553    }
554
555    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
556    pub fn reset_session(&self) {
557        *self.current_schema.lock() = None;
558        *self.current_graph.lock() = None;
559        *self.time_zone.lock() = None;
560        self.session_params.lock().clear();
561        *self.viewing_epoch_override.lock() = None;
562    }
563
564    /// Resets only the session schema (Section 7.2 GR1).
565    pub fn reset_schema(&self) {
566        *self.current_schema.lock() = None;
567    }
568
569    /// Resets only the session graph (Section 7.2 GR2).
570    pub fn reset_graph(&self) {
571        *self.current_graph.lock() = None;
572    }
573
574    /// Resets only the session time zone (Section 7.2 GR3).
575    pub fn reset_time_zone(&self) {
576        *self.time_zone.lock() = None;
577    }
578
579    /// Resets only session parameters (Section 7.2 GR4).
580    pub fn reset_parameters(&self) {
581        self.session_params.lock().clear();
582    }
583
584    // --- Time-travel API ---
585
586    /// Sets a viewing epoch override for time-travel queries.
587    ///
588    /// While set, all queries on this session see the database as it existed
589    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
590    /// to return to normal behavior.
591    pub fn set_viewing_epoch(&self, epoch: EpochId) {
592        *self.viewing_epoch_override.lock() = Some(epoch);
593    }
594
595    /// Clears the viewing epoch override, returning to normal behavior.
596    pub fn clear_viewing_epoch(&self) {
597        *self.viewing_epoch_override.lock() = None;
598    }
599
600    /// Returns the current viewing epoch override, if any.
601    #[must_use]
602    pub fn viewing_epoch(&self) -> Option<EpochId> {
603        *self.viewing_epoch_override.lock()
604    }
605
606    /// Returns all versions of a node with their creation/deletion epochs.
607    ///
608    /// Properties and labels reflect the current state (not versioned per-epoch).
609    #[must_use]
610    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
611        self.active_lpg_store().get_node_history(id)
612    }
613
614    /// Returns all versions of an edge with their creation/deletion epochs.
615    ///
616    /// Properties reflect the current state (not versioned per-epoch).
617    #[must_use]
618    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
619        self.active_lpg_store().get_edge_history(id)
620    }
621
622    /// Checks that the session's graph model supports LPG operations.
623    fn require_lpg(&self, language: &str) -> Result<()> {
624        if self.graph_model == GraphModel::Rdf {
625            return Err(grafeo_common::utils::error::Error::Internal(format!(
626                "This is an RDF database. {language} queries require an LPG database."
627            )));
628        }
629        Ok(())
630    }
631
632    /// Executes a session or transaction command, returning an empty result.
633    #[cfg(feature = "gql")]
634    fn execute_session_command(
635        &self,
636        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
637    ) -> Result<QueryResult> {
638        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
639        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
640
641        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
642        if *self.read_only_tx.lock() {
643            match &cmd {
644                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
645                    return Err(Error::Transaction(
646                        grafeo_common::utils::error::TransactionError::ReadOnly,
647                    ));
648                }
649                _ => {} // Session state + transaction control allowed
650            }
651        }
652
653        match cmd {
654            SessionCommand::CreateGraph {
655                name,
656                if_not_exists,
657                typed,
658                like_graph,
659                copy_of,
660                open: _,
661            } => {
662                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
663                let storage_key = self.effective_graph_key(&name);
664
665                // Validate source graph exists for LIKE / AS COPY OF
666                if let Some(ref src) = like_graph {
667                    let src_key = self.effective_graph_key(src);
668                    if self.store.graph(&src_key).is_none() {
669                        return Err(Error::Query(QueryError::new(
670                            QueryErrorKind::Semantic,
671                            format!("Source graph '{src}' does not exist"),
672                        )));
673                    }
674                }
675                if let Some(ref src) = copy_of {
676                    let src_key = self.effective_graph_key(src);
677                    if self.store.graph(&src_key).is_none() {
678                        return Err(Error::Query(QueryError::new(
679                            QueryErrorKind::Semantic,
680                            format!("Source graph '{src}' does not exist"),
681                        )));
682                    }
683                }
684
685                let created = self
686                    .store
687                    .create_graph(&storage_key)
688                    .map_err(|e| Error::Internal(e.to_string()))?;
689                if !created && !if_not_exists {
690                    return Err(Error::Query(QueryError::new(
691                        QueryErrorKind::Semantic,
692                        format!("Graph '{name}' already exists"),
693                    )));
694                }
695                if created {
696                    #[cfg(feature = "wal")]
697                    self.log_schema_wal(
698                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
699                            name: storage_key.clone(),
700                        },
701                    );
702                }
703
704                // AS COPY OF: copy data from source graph
705                if let Some(ref src) = copy_of {
706                    let src_key = self.effective_graph_key(src);
707                    self.store
708                        .copy_graph(Some(&src_key), Some(&storage_key))
709                        .map_err(|e| Error::Internal(e.to_string()))?;
710                }
711
712                // Bind to graph type if specified.
713                // If the parser produced a '/' in the name it is already a qualified
714                // "schema/type" key; otherwise resolve against the current schema.
715                if let Some(type_name) = typed
716                    && let Err(e) = self.catalog.bind_graph_type(
717                        &storage_key,
718                        if type_name.contains('/') {
719                            type_name.clone()
720                        } else {
721                            self.effective_type_key(&type_name)
722                        },
723                    )
724                {
725                    return Err(Error::Query(QueryError::new(
726                        QueryErrorKind::Semantic,
727                        e.to_string(),
728                    )));
729                }
730
731                // LIKE: copy graph type binding from source
732                if let Some(ref src) = like_graph {
733                    let src_key = self.effective_graph_key(src);
734                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
735                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
736                    }
737                }
738
739                Ok(QueryResult::empty())
740            }
741            SessionCommand::DropGraph { name, if_exists } => {
742                let storage_key = self.effective_graph_key(&name);
743                let dropped = self.store.drop_graph(&storage_key);
744                if !dropped && !if_exists {
745                    return Err(Error::Query(QueryError::new(
746                        QueryErrorKind::Semantic,
747                        format!("Graph '{name}' does not exist"),
748                    )));
749                }
750                if dropped {
751                    #[cfg(feature = "wal")]
752                    self.log_schema_wal(
753                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
754                            name: storage_key.clone(),
755                        },
756                    );
757                    // If this session was using the dropped graph, reset to default
758                    let mut current = self.current_graph.lock();
759                    if current
760                        .as_deref()
761                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
762                    {
763                        *current = None;
764                    }
765                }
766                Ok(QueryResult::empty())
767            }
768            SessionCommand::UseGraph(name) => {
769                // Verify graph exists (resolve within current schema)
770                let effective_key = self.effective_graph_key(&name);
771                if !name.eq_ignore_ascii_case("default")
772                    && self.store.graph(&effective_key).is_none()
773                {
774                    return Err(Error::Query(QueryError::new(
775                        QueryErrorKind::Semantic,
776                        format!("Graph '{name}' does not exist"),
777                    )));
778                }
779                self.use_graph(&name);
780                // Track the new graph if in a transaction
781                self.track_graph_touch();
782                Ok(QueryResult::empty())
783            }
784            SessionCommand::SessionSetGraph(name) => {
785                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
786                let effective_key = self.effective_graph_key(&name);
787                if !name.eq_ignore_ascii_case("default")
788                    && self.store.graph(&effective_key).is_none()
789                {
790                    return Err(Error::Query(QueryError::new(
791                        QueryErrorKind::Semantic,
792                        format!("Graph '{name}' does not exist"),
793                    )));
794                }
795                self.use_graph(&name);
796                // Track the new graph if in a transaction
797                self.track_graph_touch();
798                Ok(QueryResult::empty())
799            }
800            SessionCommand::SessionSetSchema(name) => {
801                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
802                if !self.catalog.schema_exists(&name) {
803                    return Err(Error::Query(QueryError::new(
804                        QueryErrorKind::Semantic,
805                        format!("Schema '{name}' does not exist"),
806                    )));
807                }
808                self.set_schema(&name);
809                Ok(QueryResult::empty())
810            }
811            SessionCommand::SessionSetTimeZone(tz) => {
812                self.set_time_zone(&tz);
813                Ok(QueryResult::empty())
814            }
815            SessionCommand::SessionSetParameter(key, expr) => {
816                if key.eq_ignore_ascii_case("viewing_epoch") {
817                    match Self::eval_integer_literal(&expr) {
818                        Some(n) if n >= 0 => {
819                            self.set_viewing_epoch(EpochId::new(n as u64));
820                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
821                        }
822                        _ => Err(Error::Query(QueryError::new(
823                            QueryErrorKind::Semantic,
824                            "viewing_epoch must be a non-negative integer literal",
825                        ))),
826                    }
827                } else {
828                    // For now, store parameter name with Null value.
829                    // Full expression evaluation would require building and executing a plan.
830                    self.set_parameter(&key, Value::Null);
831                    Ok(QueryResult::empty())
832                }
833            }
834            SessionCommand::SessionReset(target) => {
835                use grafeo_adapters::query::gql::ast::SessionResetTarget;
836                match target {
837                    SessionResetTarget::All => self.reset_session(),
838                    SessionResetTarget::Schema => self.reset_schema(),
839                    SessionResetTarget::Graph => self.reset_graph(),
840                    SessionResetTarget::TimeZone => self.reset_time_zone(),
841                    SessionResetTarget::Parameters => self.reset_parameters(),
842                }
843                Ok(QueryResult::empty())
844            }
845            SessionCommand::SessionClose => {
846                self.reset_session();
847                Ok(QueryResult::empty())
848            }
849            SessionCommand::StartTransaction {
850                read_only,
851                isolation_level,
852            } => {
853                let engine_level = isolation_level.map(|l| match l {
854                    TransactionIsolationLevel::ReadCommitted => {
855                        crate::transaction::IsolationLevel::ReadCommitted
856                    }
857                    TransactionIsolationLevel::SnapshotIsolation => {
858                        crate::transaction::IsolationLevel::SnapshotIsolation
859                    }
860                    TransactionIsolationLevel::Serializable => {
861                        crate::transaction::IsolationLevel::Serializable
862                    }
863                });
864                self.begin_transaction_inner(read_only, engine_level)?;
865                Ok(QueryResult::status("Transaction started"))
866            }
867            SessionCommand::Commit => {
868                self.commit_inner()?;
869                Ok(QueryResult::status("Transaction committed"))
870            }
871            SessionCommand::Rollback => {
872                self.rollback_inner()?;
873                Ok(QueryResult::status("Transaction rolled back"))
874            }
875            SessionCommand::Savepoint(name) => {
876                self.savepoint(&name)?;
877                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
878            }
879            SessionCommand::RollbackToSavepoint(name) => {
880                self.rollback_to_savepoint(&name)?;
881                Ok(QueryResult::status(format!(
882                    "Rolled back to savepoint '{name}'"
883                )))
884            }
885            SessionCommand::ReleaseSavepoint(name) => {
886                self.release_savepoint(&name)?;
887                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
888            }
889        }
890    }
891
892    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
893    #[cfg(feature = "wal")]
894    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
895        if let Some(ref wal) = self.wal
896            && let Err(e) = wal.log(record)
897        {
898            grafeo_warn!("Failed to log schema change to WAL: {}", e);
899        }
900    }
901
902    /// Executes a schema DDL command, returning a status result.
903    #[cfg(feature = "gql")]
904    fn execute_schema_command(
905        &self,
906        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
907    ) -> Result<QueryResult> {
908        use crate::catalog::{
909            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
910        };
911        use grafeo_adapters::query::gql::ast::SchemaStatement;
912        #[cfg(feature = "wal")]
913        use grafeo_adapters::storage::wal::WalRecord;
914        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
915
916        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
917        macro_rules! wal_log {
918            ($self:expr, $record:expr) => {
919                #[cfg(feature = "wal")]
920                $self.log_schema_wal(&$record);
921            };
922        }
923
924        let result = match cmd {
925            SchemaStatement::CreateNodeType(stmt) => {
926                let effective_name = self.effective_type_key(&stmt.name);
927                #[cfg(feature = "wal")]
928                let props_for_wal: Vec<(String, String, bool)> = stmt
929                    .properties
930                    .iter()
931                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
932                    .collect();
933                let def = NodeTypeDefinition {
934                    name: effective_name.clone(),
935                    properties: stmt
936                        .properties
937                        .iter()
938                        .map(|p| TypedProperty {
939                            name: p.name.clone(),
940                            data_type: PropertyDataType::from_type_name(&p.data_type),
941                            nullable: p.nullable,
942                            default_value: p
943                                .default_value
944                                .as_ref()
945                                .map(|s| parse_default_literal(s)),
946                        })
947                        .collect(),
948                    constraints: Vec::new(),
949                    parent_types: stmt.parent_types.clone(),
950                };
951                let result = if stmt.or_replace {
952                    let _ = self.catalog.drop_node_type(&effective_name);
953                    self.catalog.register_node_type(def)
954                } else {
955                    self.catalog.register_node_type(def)
956                };
957                match result {
958                    Ok(()) => {
959                        wal_log!(
960                            self,
961                            WalRecord::CreateNodeType {
962                                name: effective_name.clone(),
963                                properties: props_for_wal,
964                                constraints: Vec::new(),
965                            }
966                        );
967                        Ok(QueryResult::status(format!(
968                            "Created node type '{}'",
969                            stmt.name
970                        )))
971                    }
972                    Err(e) if stmt.if_not_exists => {
973                        let _ = e;
974                        Ok(QueryResult::status("No change"))
975                    }
976                    Err(e) => Err(Error::Query(QueryError::new(
977                        QueryErrorKind::Semantic,
978                        e.to_string(),
979                    ))),
980                }
981            }
982            SchemaStatement::CreateEdgeType(stmt) => {
983                let effective_name = self.effective_type_key(&stmt.name);
984                #[cfg(feature = "wal")]
985                let props_for_wal: Vec<(String, String, bool)> = stmt
986                    .properties
987                    .iter()
988                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
989                    .collect();
990                let def = EdgeTypeDefinition {
991                    name: effective_name.clone(),
992                    properties: stmt
993                        .properties
994                        .iter()
995                        .map(|p| TypedProperty {
996                            name: p.name.clone(),
997                            data_type: PropertyDataType::from_type_name(&p.data_type),
998                            nullable: p.nullable,
999                            default_value: p
1000                                .default_value
1001                                .as_ref()
1002                                .map(|s| parse_default_literal(s)),
1003                        })
1004                        .collect(),
1005                    constraints: Vec::new(),
1006                    source_node_types: stmt.source_node_types.clone(),
1007                    target_node_types: stmt.target_node_types.clone(),
1008                };
1009                let result = if stmt.or_replace {
1010                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1011                    self.catalog.register_edge_type_def(def)
1012                } else {
1013                    self.catalog.register_edge_type_def(def)
1014                };
1015                match result {
1016                    Ok(()) => {
1017                        wal_log!(
1018                            self,
1019                            WalRecord::CreateEdgeType {
1020                                name: effective_name.clone(),
1021                                properties: props_for_wal,
1022                                constraints: Vec::new(),
1023                            }
1024                        );
1025                        Ok(QueryResult::status(format!(
1026                            "Created edge type '{}'",
1027                            stmt.name
1028                        )))
1029                    }
1030                    Err(e) if stmt.if_not_exists => {
1031                        let _ = e;
1032                        Ok(QueryResult::status("No change"))
1033                    }
1034                    Err(e) => Err(Error::Query(QueryError::new(
1035                        QueryErrorKind::Semantic,
1036                        e.to_string(),
1037                    ))),
1038                }
1039            }
1040            SchemaStatement::CreateVectorIndex(stmt) => {
1041                Self::create_vector_index_on_store(
1042                    &self.active_lpg_store(),
1043                    &stmt.node_label,
1044                    &stmt.property,
1045                    stmt.dimensions,
1046                    stmt.metric.as_deref(),
1047                )?;
1048                wal_log!(
1049                    self,
1050                    WalRecord::CreateIndex {
1051                        name: stmt.name.clone(),
1052                        label: stmt.node_label.clone(),
1053                        property: stmt.property.clone(),
1054                        index_type: "vector".to_string(),
1055                    }
1056                );
1057                Ok(QueryResult::status(format!(
1058                    "Created vector index '{}'",
1059                    stmt.name
1060                )))
1061            }
1062            SchemaStatement::DropNodeType { name, if_exists } => {
1063                let effective_name = self.effective_type_key(&name);
1064                match self.catalog.drop_node_type(&effective_name) {
1065                    Ok(()) => {
1066                        wal_log!(
1067                            self,
1068                            WalRecord::DropNodeType {
1069                                name: effective_name
1070                            }
1071                        );
1072                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1073                    }
1074                    Err(e) if if_exists => {
1075                        let _ = e;
1076                        Ok(QueryResult::status("No change"))
1077                    }
1078                    Err(e) => Err(Error::Query(QueryError::new(
1079                        QueryErrorKind::Semantic,
1080                        e.to_string(),
1081                    ))),
1082                }
1083            }
1084            SchemaStatement::DropEdgeType { name, if_exists } => {
1085                let effective_name = self.effective_type_key(&name);
1086                match self.catalog.drop_edge_type_def(&effective_name) {
1087                    Ok(()) => {
1088                        wal_log!(
1089                            self,
1090                            WalRecord::DropEdgeType {
1091                                name: effective_name
1092                            }
1093                        );
1094                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1095                    }
1096                    Err(e) if if_exists => {
1097                        let _ = e;
1098                        Ok(QueryResult::status("No change"))
1099                    }
1100                    Err(e) => Err(Error::Query(QueryError::new(
1101                        QueryErrorKind::Semantic,
1102                        e.to_string(),
1103                    ))),
1104                }
1105            }
1106            SchemaStatement::CreateIndex(stmt) => {
1107                use crate::catalog::IndexType as CatalogIndexType;
1108                use grafeo_adapters::query::gql::ast::IndexKind;
1109                let active = self.active_lpg_store();
1110                let index_type_str = match stmt.index_kind {
1111                    IndexKind::Property => "property",
1112                    IndexKind::BTree => "btree",
1113                    IndexKind::Text => "text",
1114                    IndexKind::Vector => "vector",
1115                };
1116                match stmt.index_kind {
1117                    IndexKind::Property | IndexKind::BTree => {
1118                        for prop in &stmt.properties {
1119                            active.create_property_index(prop);
1120                        }
1121                    }
1122                    IndexKind::Text => {
1123                        for prop in &stmt.properties {
1124                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1125                        }
1126                    }
1127                    IndexKind::Vector => {
1128                        for prop in &stmt.properties {
1129                            Self::create_vector_index_on_store(
1130                                &active,
1131                                &stmt.label,
1132                                prop,
1133                                stmt.options.dimensions,
1134                                stmt.options.metric.as_deref(),
1135                            )?;
1136                        }
1137                    }
1138                }
1139                // Register each property index in the catalog for SHOW INDEXES
1140                // and DROP INDEX name-based lookup.
1141                let catalog_index_type = match stmt.index_kind {
1142                    IndexKind::Property => CatalogIndexType::Hash,
1143                    IndexKind::BTree => CatalogIndexType::BTree,
1144                    IndexKind::Text => CatalogIndexType::FullText,
1145                    IndexKind::Vector => CatalogIndexType::Hash,
1146                };
1147                let label_id = self.catalog.get_or_create_label(&stmt.label);
1148                for prop in &stmt.properties {
1149                    let prop_id = self.catalog.get_or_create_property_key(prop);
1150                    self.catalog
1151                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1152                }
1153                #[cfg(feature = "wal")]
1154                for prop in &stmt.properties {
1155                    wal_log!(
1156                        self,
1157                        WalRecord::CreateIndex {
1158                            name: stmt.name.clone(),
1159                            label: stmt.label.clone(),
1160                            property: prop.clone(),
1161                            index_type: index_type_str.to_string(),
1162                        }
1163                    );
1164                }
1165                Ok(QueryResult::status(format!(
1166                    "Created {} index '{}'",
1167                    index_type_str, stmt.name
1168                )))
1169            }
1170            SchemaStatement::DropIndex { name, if_exists } => {
1171                // Look up the index by name in the catalog to find the
1172                // underlying property, then drop from both catalog and store.
1173                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1174                    let def = self.catalog.get_index(index_id);
1175                    self.catalog.drop_index(index_id);
1176                    if let Some(def) = def
1177                        && let Some(prop_name) =
1178                            self.catalog.get_property_key_name(def.property_key)
1179                    {
1180                        self.active_lpg_store().drop_property_index(&prop_name);
1181                    }
1182                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1183                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1184                } else if if_exists {
1185                    Ok(QueryResult::status("No change".to_string()))
1186                } else {
1187                    Err(Error::Query(QueryError::new(
1188                        QueryErrorKind::Semantic,
1189                        format!("Index '{name}' does not exist"),
1190                    )))
1191                }
1192            }
1193            SchemaStatement::CreateConstraint(stmt) => {
1194                use crate::catalog::TypeConstraint;
1195                use grafeo_adapters::query::gql::ast::ConstraintKind;
1196                let kind_str = match stmt.constraint_kind {
1197                    ConstraintKind::Unique => "unique",
1198                    ConstraintKind::NodeKey => "node_key",
1199                    ConstraintKind::NotNull => "not_null",
1200                    ConstraintKind::Exists => "exists",
1201                };
1202                let constraint_name = stmt
1203                    .name
1204                    .clone()
1205                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1206
1207                // Register constraint in catalog type definitions
1208                match stmt.constraint_kind {
1209                    ConstraintKind::Unique => {
1210                        for prop in &stmt.properties {
1211                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1212                            let prop_id = self.catalog.get_or_create_property_key(prop);
1213                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1214                        }
1215                        let _ = self.catalog.add_constraint_to_type(
1216                            &stmt.label,
1217                            TypeConstraint::Unique(stmt.properties.clone()),
1218                        );
1219                    }
1220                    ConstraintKind::NodeKey => {
1221                        for prop in &stmt.properties {
1222                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1223                            let prop_id = self.catalog.get_or_create_property_key(prop);
1224                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1225                            let _ = self.catalog.add_required_property(label_id, prop_id);
1226                        }
1227                        let _ = self.catalog.add_constraint_to_type(
1228                            &stmt.label,
1229                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1230                        );
1231                    }
1232                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1233                        for prop in &stmt.properties {
1234                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1235                            let prop_id = self.catalog.get_or_create_property_key(prop);
1236                            let _ = self.catalog.add_required_property(label_id, prop_id);
1237                            let _ = self.catalog.add_constraint_to_type(
1238                                &stmt.label,
1239                                TypeConstraint::NotNull(prop.clone()),
1240                            );
1241                        }
1242                    }
1243                }
1244
1245                wal_log!(
1246                    self,
1247                    WalRecord::CreateConstraint {
1248                        name: constraint_name.clone(),
1249                        label: stmt.label.clone(),
1250                        properties: stmt.properties.clone(),
1251                        kind: kind_str.to_string(),
1252                    }
1253                );
1254                Ok(QueryResult::status(format!(
1255                    "Created {kind_str} constraint '{constraint_name}'"
1256                )))
1257            }
1258            SchemaStatement::DropConstraint { name, if_exists } => {
1259                let _ = if_exists;
1260                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1261                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1262            }
1263            SchemaStatement::CreateGraphType(stmt) => {
1264                use crate::catalog::GraphTypeDefinition;
1265                use grafeo_adapters::query::gql::ast::InlineElementType;
1266
1267                let effective_name = self.effective_type_key(&stmt.name);
1268
1269                // GG04: LIKE clause copies type from existing graph
1270                let (mut node_types, mut edge_types, open) =
1271                    if let Some(ref like_graph) = stmt.like_graph {
1272                        // Infer types from the graph's bound type, or use its existing types
1273                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1274                            if let Some(existing) = self
1275                                .catalog
1276                                .schema()
1277                                .and_then(|s| s.get_graph_type(&type_name))
1278                            {
1279                                (
1280                                    existing.allowed_node_types.clone(),
1281                                    existing.allowed_edge_types.clone(),
1282                                    existing.open,
1283                                )
1284                            } else {
1285                                (Vec::new(), Vec::new(), true)
1286                            }
1287                        } else {
1288                            // GG22: Infer from graph data (labels used in graph)
1289                            let nt = self.catalog.all_node_type_names();
1290                            let et = self.catalog.all_edge_type_names();
1291                            if nt.is_empty() && et.is_empty() {
1292                                (Vec::new(), Vec::new(), true)
1293                            } else {
1294                                (nt, et, false)
1295                            }
1296                        }
1297                    } else {
1298                        // Prefix element type names with schema for consistency
1299                        let nt = stmt
1300                            .node_types
1301                            .iter()
1302                            .map(|n| self.effective_type_key(n))
1303                            .collect();
1304                        let et = stmt
1305                            .edge_types
1306                            .iter()
1307                            .map(|n| self.effective_type_key(n))
1308                            .collect();
1309                        (nt, et, stmt.open)
1310                    };
1311
1312                // GG03: Register inline element types and add their names
1313                for inline in &stmt.inline_types {
1314                    match inline {
1315                        InlineElementType::Node {
1316                            name,
1317                            properties,
1318                            key_labels,
1319                            ..
1320                        } => {
1321                            let inline_effective = self.effective_type_key(name);
1322                            let def = NodeTypeDefinition {
1323                                name: inline_effective.clone(),
1324                                properties: properties
1325                                    .iter()
1326                                    .map(|p| TypedProperty {
1327                                        name: p.name.clone(),
1328                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1329                                        nullable: p.nullable,
1330                                        default_value: None,
1331                                    })
1332                                    .collect(),
1333                                constraints: Vec::new(),
1334                                parent_types: key_labels.clone(),
1335                            };
1336                            // Register or replace so inline defs override existing
1337                            self.catalog.register_or_replace_node_type(def);
1338                            #[cfg(feature = "wal")]
1339                            {
1340                                let props_for_wal: Vec<(String, String, bool)> = properties
1341                                    .iter()
1342                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1343                                    .collect();
1344                                self.log_schema_wal(&WalRecord::CreateNodeType {
1345                                    name: inline_effective.clone(),
1346                                    properties: props_for_wal,
1347                                    constraints: Vec::new(),
1348                                });
1349                            }
1350                            if !node_types.contains(&inline_effective) {
1351                                node_types.push(inline_effective);
1352                            }
1353                        }
1354                        InlineElementType::Edge {
1355                            name,
1356                            properties,
1357                            source_node_types,
1358                            target_node_types,
1359                            ..
1360                        } => {
1361                            let inline_effective = self.effective_type_key(name);
1362                            let def = EdgeTypeDefinition {
1363                                name: inline_effective.clone(),
1364                                properties: properties
1365                                    .iter()
1366                                    .map(|p| TypedProperty {
1367                                        name: p.name.clone(),
1368                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1369                                        nullable: p.nullable,
1370                                        default_value: None,
1371                                    })
1372                                    .collect(),
1373                                constraints: Vec::new(),
1374                                source_node_types: source_node_types.clone(),
1375                                target_node_types: target_node_types.clone(),
1376                            };
1377                            self.catalog.register_or_replace_edge_type_def(def);
1378                            #[cfg(feature = "wal")]
1379                            {
1380                                let props_for_wal: Vec<(String, String, bool)> = properties
1381                                    .iter()
1382                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1383                                    .collect();
1384                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1385                                    name: inline_effective.clone(),
1386                                    properties: props_for_wal,
1387                                    constraints: Vec::new(),
1388                                });
1389                            }
1390                            if !edge_types.contains(&inline_effective) {
1391                                edge_types.push(inline_effective);
1392                            }
1393                        }
1394                    }
1395                }
1396
1397                let def = GraphTypeDefinition {
1398                    name: effective_name.clone(),
1399                    allowed_node_types: node_types.clone(),
1400                    allowed_edge_types: edge_types.clone(),
1401                    open,
1402                };
1403                let result = if stmt.or_replace {
1404                    // Drop existing first, ignore error if not found
1405                    let _ = self.catalog.drop_graph_type(&effective_name);
1406                    self.catalog.register_graph_type(def)
1407                } else {
1408                    self.catalog.register_graph_type(def)
1409                };
1410                match result {
1411                    Ok(()) => {
1412                        wal_log!(
1413                            self,
1414                            WalRecord::CreateGraphType {
1415                                name: effective_name.clone(),
1416                                node_types,
1417                                edge_types,
1418                                open,
1419                            }
1420                        );
1421                        Ok(QueryResult::status(format!(
1422                            "Created graph type '{}'",
1423                            stmt.name
1424                        )))
1425                    }
1426                    Err(e) if stmt.if_not_exists => {
1427                        let _ = e;
1428                        Ok(QueryResult::status("No change"))
1429                    }
1430                    Err(e) => Err(Error::Query(QueryError::new(
1431                        QueryErrorKind::Semantic,
1432                        e.to_string(),
1433                    ))),
1434                }
1435            }
1436            SchemaStatement::DropGraphType { name, if_exists } => {
1437                let effective_name = self.effective_type_key(&name);
1438                match self.catalog.drop_graph_type(&effective_name) {
1439                    Ok(()) => {
1440                        wal_log!(
1441                            self,
1442                            WalRecord::DropGraphType {
1443                                name: effective_name
1444                            }
1445                        );
1446                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1447                    }
1448                    Err(e) if if_exists => {
1449                        let _ = e;
1450                        Ok(QueryResult::status("No change"))
1451                    }
1452                    Err(e) => Err(Error::Query(QueryError::new(
1453                        QueryErrorKind::Semantic,
1454                        e.to_string(),
1455                    ))),
1456                }
1457            }
1458            SchemaStatement::CreateSchema {
1459                name,
1460                if_not_exists,
1461            } => match self.catalog.register_schema_namespace(name.clone()) {
1462                Ok(()) => {
1463                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1464                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1465                }
1466                Err(e) if if_not_exists => {
1467                    let _ = e;
1468                    Ok(QueryResult::status("No change"))
1469                }
1470                Err(e) => Err(Error::Query(QueryError::new(
1471                    QueryErrorKind::Semantic,
1472                    e.to_string(),
1473                ))),
1474            },
1475            SchemaStatement::DropSchema { name, if_exists } => {
1476                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1477                let prefix = format!("{name}/");
1478                let has_graphs = self
1479                    .store
1480                    .graph_names()
1481                    .iter()
1482                    .any(|g| g.starts_with(&prefix));
1483                let has_types = self
1484                    .catalog
1485                    .all_node_type_names()
1486                    .iter()
1487                    .any(|n| n.starts_with(&prefix))
1488                    || self
1489                        .catalog
1490                        .all_edge_type_names()
1491                        .iter()
1492                        .any(|n| n.starts_with(&prefix))
1493                    || self
1494                        .catalog
1495                        .all_graph_type_names()
1496                        .iter()
1497                        .any(|n| n.starts_with(&prefix));
1498                if has_graphs || has_types {
1499                    return Err(Error::Query(QueryError::new(
1500                        QueryErrorKind::Semantic,
1501                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1502                    )));
1503                }
1504                match self.catalog.drop_schema_namespace(&name) {
1505                    Ok(()) => {
1506                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1507                        // If this session was using the dropped schema, reset it
1508                        let mut current = self.current_schema.lock();
1509                        if current
1510                            .as_deref()
1511                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1512                        {
1513                            *current = None;
1514                        }
1515                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1516                    }
1517                    Err(e) if if_exists => {
1518                        let _ = e;
1519                        Ok(QueryResult::status("No change"))
1520                    }
1521                    Err(e) => Err(Error::Query(QueryError::new(
1522                        QueryErrorKind::Semantic,
1523                        e.to_string(),
1524                    ))),
1525                }
1526            }
1527            SchemaStatement::AlterNodeType(stmt) => {
1528                use grafeo_adapters::query::gql::ast::TypeAlteration;
1529                let effective_name = self.effective_type_key(&stmt.name);
1530                let mut wal_alts = Vec::new();
1531                for alt in &stmt.alterations {
1532                    match alt {
1533                        TypeAlteration::AddProperty(prop) => {
1534                            let typed = TypedProperty {
1535                                name: prop.name.clone(),
1536                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1537                                nullable: prop.nullable,
1538                                default_value: prop
1539                                    .default_value
1540                                    .as_ref()
1541                                    .map(|s| parse_default_literal(s)),
1542                            };
1543                            self.catalog
1544                                .alter_node_type_add_property(&effective_name, typed)
1545                                .map_err(|e| {
1546                                    Error::Query(QueryError::new(
1547                                        QueryErrorKind::Semantic,
1548                                        e.to_string(),
1549                                    ))
1550                                })?;
1551                            wal_alts.push((
1552                                "add".to_string(),
1553                                prop.name.clone(),
1554                                prop.data_type.clone(),
1555                                prop.nullable,
1556                            ));
1557                        }
1558                        TypeAlteration::DropProperty(name) => {
1559                            self.catalog
1560                                .alter_node_type_drop_property(&effective_name, name)
1561                                .map_err(|e| {
1562                                    Error::Query(QueryError::new(
1563                                        QueryErrorKind::Semantic,
1564                                        e.to_string(),
1565                                    ))
1566                                })?;
1567                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1568                        }
1569                    }
1570                }
1571                wal_log!(
1572                    self,
1573                    WalRecord::AlterNodeType {
1574                        name: effective_name,
1575                        alterations: wal_alts,
1576                    }
1577                );
1578                Ok(QueryResult::status(format!(
1579                    "Altered node type '{}'",
1580                    stmt.name
1581                )))
1582            }
1583            SchemaStatement::AlterEdgeType(stmt) => {
1584                use grafeo_adapters::query::gql::ast::TypeAlteration;
1585                let effective_name = self.effective_type_key(&stmt.name);
1586                let mut wal_alts = Vec::new();
1587                for alt in &stmt.alterations {
1588                    match alt {
1589                        TypeAlteration::AddProperty(prop) => {
1590                            let typed = TypedProperty {
1591                                name: prop.name.clone(),
1592                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1593                                nullable: prop.nullable,
1594                                default_value: prop
1595                                    .default_value
1596                                    .as_ref()
1597                                    .map(|s| parse_default_literal(s)),
1598                            };
1599                            self.catalog
1600                                .alter_edge_type_add_property(&effective_name, typed)
1601                                .map_err(|e| {
1602                                    Error::Query(QueryError::new(
1603                                        QueryErrorKind::Semantic,
1604                                        e.to_string(),
1605                                    ))
1606                                })?;
1607                            wal_alts.push((
1608                                "add".to_string(),
1609                                prop.name.clone(),
1610                                prop.data_type.clone(),
1611                                prop.nullable,
1612                            ));
1613                        }
1614                        TypeAlteration::DropProperty(name) => {
1615                            self.catalog
1616                                .alter_edge_type_drop_property(&effective_name, name)
1617                                .map_err(|e| {
1618                                    Error::Query(QueryError::new(
1619                                        QueryErrorKind::Semantic,
1620                                        e.to_string(),
1621                                    ))
1622                                })?;
1623                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1624                        }
1625                    }
1626                }
1627                wal_log!(
1628                    self,
1629                    WalRecord::AlterEdgeType {
1630                        name: effective_name,
1631                        alterations: wal_alts,
1632                    }
1633                );
1634                Ok(QueryResult::status(format!(
1635                    "Altered edge type '{}'",
1636                    stmt.name
1637                )))
1638            }
1639            SchemaStatement::AlterGraphType(stmt) => {
1640                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1641                let effective_name = self.effective_type_key(&stmt.name);
1642                let mut wal_alts = Vec::new();
1643                for alt in &stmt.alterations {
1644                    match alt {
1645                        GraphTypeAlteration::AddNodeType(name) => {
1646                            self.catalog
1647                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1648                                .map_err(|e| {
1649                                    Error::Query(QueryError::new(
1650                                        QueryErrorKind::Semantic,
1651                                        e.to_string(),
1652                                    ))
1653                                })?;
1654                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1655                        }
1656                        GraphTypeAlteration::DropNodeType(name) => {
1657                            self.catalog
1658                                .alter_graph_type_drop_node_type(&effective_name, name)
1659                                .map_err(|e| {
1660                                    Error::Query(QueryError::new(
1661                                        QueryErrorKind::Semantic,
1662                                        e.to_string(),
1663                                    ))
1664                                })?;
1665                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1666                        }
1667                        GraphTypeAlteration::AddEdgeType(name) => {
1668                            self.catalog
1669                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1670                                .map_err(|e| {
1671                                    Error::Query(QueryError::new(
1672                                        QueryErrorKind::Semantic,
1673                                        e.to_string(),
1674                                    ))
1675                                })?;
1676                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1677                        }
1678                        GraphTypeAlteration::DropEdgeType(name) => {
1679                            self.catalog
1680                                .alter_graph_type_drop_edge_type(&effective_name, name)
1681                                .map_err(|e| {
1682                                    Error::Query(QueryError::new(
1683                                        QueryErrorKind::Semantic,
1684                                        e.to_string(),
1685                                    ))
1686                                })?;
1687                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1688                        }
1689                    }
1690                }
1691                wal_log!(
1692                    self,
1693                    WalRecord::AlterGraphType {
1694                        name: effective_name,
1695                        alterations: wal_alts,
1696                    }
1697                );
1698                Ok(QueryResult::status(format!(
1699                    "Altered graph type '{}'",
1700                    stmt.name
1701                )))
1702            }
1703            SchemaStatement::CreateProcedure(stmt) => {
1704                use crate::catalog::ProcedureDefinition;
1705
1706                let def = ProcedureDefinition {
1707                    name: stmt.name.clone(),
1708                    params: stmt
1709                        .params
1710                        .iter()
1711                        .map(|p| (p.name.clone(), p.param_type.clone()))
1712                        .collect(),
1713                    returns: stmt
1714                        .returns
1715                        .iter()
1716                        .map(|r| (r.name.clone(), r.return_type.clone()))
1717                        .collect(),
1718                    body: stmt.body.clone(),
1719                };
1720
1721                if stmt.or_replace {
1722                    self.catalog.replace_procedure(def).map_err(|e| {
1723                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1724                    })?;
1725                } else {
1726                    match self.catalog.register_procedure(def) {
1727                        Ok(()) => {}
1728                        Err(_) if stmt.if_not_exists => {
1729                            return Ok(QueryResult::empty());
1730                        }
1731                        Err(e) => {
1732                            return Err(Error::Query(QueryError::new(
1733                                QueryErrorKind::Semantic,
1734                                e.to_string(),
1735                            )));
1736                        }
1737                    }
1738                }
1739
1740                wal_log!(
1741                    self,
1742                    WalRecord::CreateProcedure {
1743                        name: stmt.name.clone(),
1744                        params: stmt
1745                            .params
1746                            .iter()
1747                            .map(|p| (p.name.clone(), p.param_type.clone()))
1748                            .collect(),
1749                        returns: stmt
1750                            .returns
1751                            .iter()
1752                            .map(|r| (r.name.clone(), r.return_type.clone()))
1753                            .collect(),
1754                        body: stmt.body,
1755                    }
1756                );
1757                Ok(QueryResult::status(format!(
1758                    "Created procedure '{}'",
1759                    stmt.name
1760                )))
1761            }
1762            SchemaStatement::DropProcedure { name, if_exists } => {
1763                match self.catalog.drop_procedure(&name) {
1764                    Ok(()) => {}
1765                    Err(_) if if_exists => {
1766                        return Ok(QueryResult::empty());
1767                    }
1768                    Err(e) => {
1769                        return Err(Error::Query(QueryError::new(
1770                            QueryErrorKind::Semantic,
1771                            e.to_string(),
1772                        )));
1773                    }
1774                }
1775                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1776                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1777            }
1778            SchemaStatement::ShowIndexes => {
1779                return self.execute_show_indexes();
1780            }
1781            SchemaStatement::ShowConstraints => {
1782                return self.execute_show_constraints();
1783            }
1784            SchemaStatement::ShowNodeTypes => {
1785                return self.execute_show_node_types();
1786            }
1787            SchemaStatement::ShowEdgeTypes => {
1788                return self.execute_show_edge_types();
1789            }
1790            SchemaStatement::ShowGraphTypes => {
1791                return self.execute_show_graph_types();
1792            }
1793            SchemaStatement::ShowGraphType(name) => {
1794                return self.execute_show_graph_type(&name);
1795            }
1796            SchemaStatement::ShowCurrentGraphType => {
1797                return self.execute_show_current_graph_type();
1798            }
1799            SchemaStatement::ShowGraphs => {
1800                return self.execute_show_graphs();
1801            }
1802            SchemaStatement::ShowSchemas => {
1803                return self.execute_show_schemas();
1804            }
1805        };
1806
1807        // Invalidate all cached query plans after any successful DDL change.
1808        // DDL is rare, so clearing the entire cache is cheap and correct.
1809        if result.is_ok() {
1810            self.query_cache.clear();
1811        }
1812
1813        result
1814    }
1815
1816    /// Creates a vector index on the store by scanning existing nodes.
1817    #[cfg(all(feature = "gql", feature = "vector-index"))]
1818    fn create_vector_index_on_store(
1819        store: &LpgStore,
1820        label: &str,
1821        property: &str,
1822        dimensions: Option<usize>,
1823        metric: Option<&str>,
1824    ) -> Result<()> {
1825        use grafeo_common::types::{PropertyKey, Value};
1826        use grafeo_common::utils::error::Error;
1827        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1828
1829        let metric = match metric {
1830            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1831                Error::Internal(format!(
1832                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1833                ))
1834            })?,
1835            None => DistanceMetric::Cosine,
1836        };
1837
1838        let prop_key = PropertyKey::new(property);
1839        let mut found_dims: Option<usize> = dimensions;
1840        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1841
1842        for node in store.nodes_with_label(label) {
1843            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1844                if let Some(expected) = found_dims {
1845                    if v.len() != expected {
1846                        return Err(Error::Internal(format!(
1847                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1848                            v.len(),
1849                            node.id.0
1850                        )));
1851                    }
1852                } else {
1853                    found_dims = Some(v.len());
1854                }
1855                vectors.push((node.id, v.to_vec()));
1856            }
1857        }
1858
1859        let Some(dims) = found_dims else {
1860            return Err(Error::Internal(format!(
1861                "No vector properties found on :{label}({property}) and no dimensions specified"
1862            )));
1863        };
1864
1865        let config = HnswConfig::new(dims, metric);
1866        let index = HnswIndex::with_capacity(config, vectors.len());
1867        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1868        for (node_id, vec) in &vectors {
1869            index.insert(*node_id, vec, &accessor);
1870        }
1871
1872        store.add_vector_index(label, property, Arc::new(index));
1873        Ok(())
1874    }
1875
1876    /// Stub for when vector-index feature is not enabled.
1877    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1878    fn create_vector_index_on_store(
1879        _store: &LpgStore,
1880        _label: &str,
1881        _property: &str,
1882        _dimensions: Option<usize>,
1883        _metric: Option<&str>,
1884    ) -> Result<()> {
1885        Err(grafeo_common::utils::error::Error::Internal(
1886            "Vector index support requires the 'vector-index' feature".to_string(),
1887        ))
1888    }
1889
1890    /// Creates a text index on the store by scanning existing nodes.
1891    #[cfg(all(feature = "gql", feature = "text-index"))]
1892    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1893        use grafeo_common::types::{PropertyKey, Value};
1894        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1895
1896        let mut index = InvertedIndex::new(BM25Config::default());
1897        let prop_key = PropertyKey::new(property);
1898
1899        let nodes = store.nodes_by_label(label);
1900        for node_id in nodes {
1901            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1902                index.insert(node_id, text.as_str());
1903            }
1904        }
1905
1906        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1907        Ok(())
1908    }
1909
1910    /// Stub for when text-index feature is not enabled.
1911    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1912    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1913        Err(grafeo_common::utils::error::Error::Internal(
1914            "Text index support requires the 'text-index' feature".to_string(),
1915        ))
1916    }
1917
1918    /// Returns a table of all indexes from the catalog.
1919    fn execute_show_indexes(&self) -> Result<QueryResult> {
1920        let indexes = self.catalog.all_indexes();
1921        let columns = vec![
1922            "name".to_string(),
1923            "type".to_string(),
1924            "label".to_string(),
1925            "property".to_string(),
1926        ];
1927        let rows: Vec<Vec<Value>> = indexes
1928            .into_iter()
1929            .map(|def| {
1930                let label_name = self
1931                    .catalog
1932                    .get_label_name(def.label)
1933                    .unwrap_or_else(|| "?".into());
1934                let prop_name = self
1935                    .catalog
1936                    .get_property_key_name(def.property_key)
1937                    .unwrap_or_else(|| "?".into());
1938                vec![
1939                    Value::from(def.name),
1940                    Value::from(format!("{:?}", def.index_type)),
1941                    Value::from(&*label_name),
1942                    Value::from(&*prop_name),
1943                ]
1944            })
1945            .collect();
1946        Ok(QueryResult {
1947            columns,
1948            column_types: Vec::new(),
1949            rows,
1950            ..QueryResult::empty()
1951        })
1952    }
1953
1954    /// Returns a table of all constraints (currently metadata-only).
1955    fn execute_show_constraints(&self) -> Result<QueryResult> {
1956        // Constraints are tracked in WAL but not yet in a queryable catalog.
1957        // Return an empty table with the expected schema.
1958        Ok(QueryResult {
1959            columns: vec![
1960                "name".to_string(),
1961                "type".to_string(),
1962                "label".to_string(),
1963                "properties".to_string(),
1964            ],
1965            column_types: Vec::new(),
1966            rows: Vec::new(),
1967            ..QueryResult::empty()
1968        })
1969    }
1970
1971    /// Returns a table of all registered node types in the current schema.
1972    fn execute_show_node_types(&self) -> Result<QueryResult> {
1973        let columns = vec![
1974            "name".to_string(),
1975            "properties".to_string(),
1976            "constraints".to_string(),
1977            "parents".to_string(),
1978        ];
1979        let schema = self.current_schema.lock().clone();
1980        let all_names = self.catalog.all_node_type_names();
1981        let type_names: Vec<String> = match &schema {
1982            Some(s) => {
1983                let prefix = format!("{s}/");
1984                all_names
1985                    .into_iter()
1986                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1987                    .collect()
1988            }
1989            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1990        };
1991        let rows: Vec<Vec<Value>> = type_names
1992            .into_iter()
1993            .filter_map(|name| {
1994                let lookup = match &schema {
1995                    Some(s) => format!("{s}/{name}"),
1996                    None => name.clone(),
1997                };
1998                let def = self.catalog.get_node_type(&lookup)?;
1999                let props: Vec<String> = def
2000                    .properties
2001                    .iter()
2002                    .map(|p| {
2003                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2004                        format!("{} {}{}", p.name, p.data_type, nullable)
2005                    })
2006                    .collect();
2007                let constraints: Vec<String> =
2008                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2009                let parents = def.parent_types.join(", ");
2010                Some(vec![
2011                    Value::from(name),
2012                    Value::from(props.join(", ")),
2013                    Value::from(constraints.join(", ")),
2014                    Value::from(parents),
2015                ])
2016            })
2017            .collect();
2018        Ok(QueryResult {
2019            columns,
2020            column_types: Vec::new(),
2021            rows,
2022            ..QueryResult::empty()
2023        })
2024    }
2025
2026    /// Returns a table of all registered edge types in the current schema.
2027    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2028        let columns = vec![
2029            "name".to_string(),
2030            "properties".to_string(),
2031            "source_types".to_string(),
2032            "target_types".to_string(),
2033        ];
2034        let schema = self.current_schema.lock().clone();
2035        let all_names = self.catalog.all_edge_type_names();
2036        let type_names: Vec<String> = match &schema {
2037            Some(s) => {
2038                let prefix = format!("{s}/");
2039                all_names
2040                    .into_iter()
2041                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2042                    .collect()
2043            }
2044            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2045        };
2046        let rows: Vec<Vec<Value>> = type_names
2047            .into_iter()
2048            .filter_map(|name| {
2049                let lookup = match &schema {
2050                    Some(s) => format!("{s}/{name}"),
2051                    None => name.clone(),
2052                };
2053                let def = self.catalog.get_edge_type_def(&lookup)?;
2054                let props: Vec<String> = def
2055                    .properties
2056                    .iter()
2057                    .map(|p| {
2058                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2059                        format!("{} {}{}", p.name, p.data_type, nullable)
2060                    })
2061                    .collect();
2062                let src = def.source_node_types.join(", ");
2063                let tgt = def.target_node_types.join(", ");
2064                Some(vec![
2065                    Value::from(name),
2066                    Value::from(props.join(", ")),
2067                    Value::from(src),
2068                    Value::from(tgt),
2069                ])
2070            })
2071            .collect();
2072        Ok(QueryResult {
2073            columns,
2074            column_types: Vec::new(),
2075            rows,
2076            ..QueryResult::empty()
2077        })
2078    }
2079
2080    /// Returns a table of all registered graph types in the current schema.
2081    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2082        let columns = vec![
2083            "name".to_string(),
2084            "open".to_string(),
2085            "node_types".to_string(),
2086            "edge_types".to_string(),
2087        ];
2088        let schema = self.current_schema.lock().clone();
2089        let all_names = self.catalog.all_graph_type_names();
2090        let type_names: Vec<String> = match &schema {
2091            Some(s) => {
2092                let prefix = format!("{s}/");
2093                all_names
2094                    .into_iter()
2095                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2096                    .collect()
2097            }
2098            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2099        };
2100        let rows: Vec<Vec<Value>> = type_names
2101            .into_iter()
2102            .filter_map(|name| {
2103                let lookup = match &schema {
2104                    Some(s) => format!("{s}/{name}"),
2105                    None => name.clone(),
2106                };
2107                let def = self.catalog.get_graph_type_def(&lookup)?;
2108                // Strip schema prefix from allowed type names for display
2109                let strip = |n: &String| -> String {
2110                    match &schema {
2111                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2112                        None => n.clone(),
2113                    }
2114                };
2115                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2116                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2117                Some(vec![
2118                    Value::from(name),
2119                    Value::from(def.open),
2120                    Value::from(node_types.join(", ")),
2121                    Value::from(edge_types.join(", ")),
2122                ])
2123            })
2124            .collect();
2125        Ok(QueryResult {
2126            columns,
2127            column_types: Vec::new(),
2128            rows,
2129            ..QueryResult::empty()
2130        })
2131    }
2132
2133    /// Returns the list of named graphs visible in the current schema context.
2134    ///
2135    /// When a session schema is set, only graphs belonging to that schema are
2136    /// shown (their compound prefix is stripped). When no schema is set, graphs
2137    /// without a schema prefix are shown (the default schema).
2138    fn execute_show_graphs(&self) -> Result<QueryResult> {
2139        let schema = self.current_schema.lock().clone();
2140        let all_names = self.store.graph_names();
2141
2142        let mut names: Vec<String> = match &schema {
2143            Some(s) => {
2144                let prefix = format!("{s}/");
2145                all_names
2146                    .into_iter()
2147                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2148                    .collect()
2149            }
2150            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2151        };
2152        names.sort();
2153
2154        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2155        Ok(QueryResult {
2156            columns: vec!["name".to_string()],
2157            column_types: Vec::new(),
2158            rows,
2159            ..QueryResult::empty()
2160        })
2161    }
2162
2163    /// Returns the list of all schema namespaces.
2164    fn execute_show_schemas(&self) -> Result<QueryResult> {
2165        let mut names = self.catalog.schema_names();
2166        names.sort();
2167        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2168        Ok(QueryResult {
2169            columns: vec!["name".to_string()],
2170            column_types: Vec::new(),
2171            rows,
2172            ..QueryResult::empty()
2173        })
2174    }
2175
2176    /// Returns detailed info for a specific graph type.
2177    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2178        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2179
2180        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2181            Error::Query(QueryError::new(
2182                QueryErrorKind::Semantic,
2183                format!("Graph type '{name}' not found"),
2184            ))
2185        })?;
2186
2187        let columns = vec![
2188            "name".to_string(),
2189            "open".to_string(),
2190            "node_types".to_string(),
2191            "edge_types".to_string(),
2192        ];
2193        let rows = vec![vec![
2194            Value::from(def.name),
2195            Value::from(def.open),
2196            Value::from(def.allowed_node_types.join(", ")),
2197            Value::from(def.allowed_edge_types.join(", ")),
2198        ]];
2199        Ok(QueryResult {
2200            columns,
2201            column_types: Vec::new(),
2202            rows,
2203            ..QueryResult::empty()
2204        })
2205    }
2206
2207    /// Returns the graph type bound to the current graph.
2208    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2209        let graph_name = self
2210            .current_graph()
2211            .unwrap_or_else(|| "default".to_string());
2212        let columns = vec![
2213            "graph".to_string(),
2214            "graph_type".to_string(),
2215            "open".to_string(),
2216            "node_types".to_string(),
2217            "edge_types".to_string(),
2218        ];
2219
2220        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2221            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2222        {
2223            let rows = vec![vec![
2224                Value::from(graph_name),
2225                Value::from(type_name),
2226                Value::from(def.open),
2227                Value::from(def.allowed_node_types.join(", ")),
2228                Value::from(def.allowed_edge_types.join(", ")),
2229            ]];
2230            return Ok(QueryResult {
2231                columns,
2232                column_types: Vec::new(),
2233                rows,
2234                ..QueryResult::empty()
2235            });
2236        }
2237
2238        // No graph type binding found
2239        Ok(QueryResult {
2240            columns,
2241            column_types: Vec::new(),
2242            rows: vec![vec![
2243                Value::from(graph_name),
2244                Value::Null,
2245                Value::Null,
2246                Value::Null,
2247                Value::Null,
2248            ]],
2249            ..QueryResult::empty()
2250        })
2251    }
2252
2253    /// Executes a GQL query.
2254    ///
2255    /// # Errors
2256    ///
2257    /// Returns an error if the query fails to parse or execute.
2258    ///
2259    /// # Examples
2260    ///
2261    /// ```no_run
2262    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2263    /// use grafeo_engine::GrafeoDB;
2264    ///
2265    /// let db = GrafeoDB::new_in_memory();
2266    /// let session = db.session();
2267    ///
2268    /// // Create a node
2269    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2270    ///
2271    /// // Query nodes
2272    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2273    /// for row in &result.rows {
2274    ///     println!("{:?}", row);
2275    /// }
2276    /// # Ok(())
2277    /// # }
2278    /// ```
2279    #[cfg(feature = "gql")]
2280    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2281        self.require_lpg("GQL")?;
2282
2283        use crate::query::{
2284            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2285            processor::QueryLanguage, translators::gql,
2286        };
2287
2288        let _span = grafeo_info_span!(
2289            "grafeo::session::execute",
2290            language = "gql",
2291            query_len = query.len(),
2292        );
2293
2294        #[cfg(not(target_arch = "wasm32"))]
2295        let start_time = std::time::Instant::now();
2296
2297        // Parse and translate, checking for session/schema commands first
2298        let translation = gql::translate_full(query)?;
2299        let logical_plan = match translation {
2300            gql::GqlTranslationResult::SessionCommand(cmd) => {
2301                return self.execute_session_command(cmd);
2302            }
2303            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2304                // All DDL is a write operation
2305                if *self.read_only_tx.lock() {
2306                    return Err(grafeo_common::utils::error::Error::Transaction(
2307                        grafeo_common::utils::error::TransactionError::ReadOnly,
2308                    ));
2309                }
2310                return self.execute_schema_command(cmd);
2311            }
2312            gql::GqlTranslationResult::Plan(plan) => {
2313                // Block mutations in read-only transactions
2314                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2315                    return Err(grafeo_common::utils::error::Error::Transaction(
2316                        grafeo_common::utils::error::TransactionError::ReadOnly,
2317                    ));
2318                }
2319                plan
2320            }
2321        };
2322
2323        // Create cache key for this query
2324        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2325
2326        // Try to get cached optimized plan, or use the plan we just translated
2327        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2328            cached_plan
2329        } else {
2330            // Semantic validation
2331            let mut binder = Binder::new();
2332            let _binding_context = binder.bind(&logical_plan)?;
2333
2334            // Optimize the plan
2335            let active = self.active_store();
2336            let optimizer = Optimizer::from_graph_store(&*active);
2337            let plan = optimizer.optimize(logical_plan)?;
2338
2339            // Cache the optimized plan for future use
2340            self.query_cache.put_optimized(cache_key, plan.clone());
2341
2342            plan
2343        };
2344
2345        // Resolve the active store for query execution
2346        let active = self.active_store();
2347
2348        // EXPLAIN: annotate pushdown hints and return the plan tree
2349        if optimized_plan.explain {
2350            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2351            let mut plan = optimized_plan;
2352            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2353            return Ok(explain_result(&plan));
2354        }
2355
2356        // PROFILE: execute with per-operator instrumentation
2357        if optimized_plan.profile {
2358            let has_mutations = optimized_plan.root.has_mutations();
2359            return self.with_auto_commit(has_mutations, || {
2360                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2361                let planner = self.create_planner_for_store(
2362                    Arc::clone(&active),
2363                    viewing_epoch,
2364                    transaction_id,
2365                );
2366                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2367
2368                let executor = Executor::with_columns(physical_plan.columns.clone())
2369                    .with_deadline(self.query_deadline());
2370                let _result = executor.execute(physical_plan.operator.as_mut())?;
2371
2372                let total_time_ms;
2373                #[cfg(not(target_arch = "wasm32"))]
2374                {
2375                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2376                }
2377                #[cfg(target_arch = "wasm32")]
2378                {
2379                    total_time_ms = 0.0;
2380                }
2381
2382                let profile_tree = crate::query::profile::build_profile_tree(
2383                    &optimized_plan.root,
2384                    &mut entries.into_iter(),
2385                );
2386                Ok(crate::query::profile::profile_result(
2387                    &profile_tree,
2388                    total_time_ms,
2389                ))
2390            });
2391        }
2392
2393        let has_mutations = optimized_plan.root.has_mutations();
2394
2395        let result = self.with_auto_commit(has_mutations, || {
2396            // Get transaction context for MVCC visibility
2397            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2398
2399            // Convert to physical plan with transaction context
2400            // (Physical planning cannot be cached as it depends on transaction state)
2401            // Safe to use read-only fast path when: this query has no mutations AND
2402            // there is no active transaction that may have prior uncommitted writes.
2403            let has_active_tx = self.current_transaction.lock().is_some();
2404            let read_only = !has_mutations && !has_active_tx;
2405            let planner = self.create_planner_for_store_with_read_only(
2406                Arc::clone(&active),
2407                viewing_epoch,
2408                transaction_id,
2409                read_only,
2410            );
2411            let mut physical_plan = planner.plan(&optimized_plan)?;
2412
2413            // Execute the plan
2414            let executor = Executor::with_columns(physical_plan.columns.clone())
2415                .with_deadline(self.query_deadline());
2416            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2417
2418            // Add execution metrics
2419            let rows_scanned = result.rows.len() as u64;
2420            #[cfg(not(target_arch = "wasm32"))]
2421            {
2422                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2423                result.execution_time_ms = Some(elapsed_ms);
2424            }
2425            result.rows_scanned = Some(rows_scanned);
2426
2427            Ok(result)
2428        });
2429
2430        // Record metrics for this query execution.
2431        #[cfg(feature = "metrics")]
2432        {
2433            #[cfg(not(target_arch = "wasm32"))]
2434            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2435            #[cfg(target_arch = "wasm32")]
2436            let elapsed_ms = None;
2437            self.record_query_metrics("gql", elapsed_ms, &result);
2438        }
2439
2440        result
2441    }
2442
2443    /// Executes a GQL query with visibility at the specified epoch.
2444    ///
2445    /// This enables time-travel queries: the query sees the database
2446    /// as it existed at the given epoch.
2447    ///
2448    /// # Errors
2449    ///
2450    /// Returns an error if parsing or execution fails.
2451    #[cfg(feature = "gql")]
2452    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2453        let previous = self.viewing_epoch_override.lock().replace(epoch);
2454        let result = self.execute(query);
2455        *self.viewing_epoch_override.lock() = previous;
2456        result
2457    }
2458
2459    /// Executes a GQL query at a specific epoch with optional parameters.
2460    ///
2461    /// Combines epoch-based time travel with parameterized queries.
2462    ///
2463    /// # Errors
2464    ///
2465    /// Returns an error if parsing or execution fails.
2466    #[cfg(feature = "gql")]
2467    pub fn execute_at_epoch_with_params(
2468        &self,
2469        query: &str,
2470        epoch: EpochId,
2471        params: Option<std::collections::HashMap<String, Value>>,
2472    ) -> Result<QueryResult> {
2473        let previous = self.viewing_epoch_override.lock().replace(epoch);
2474        let result = if let Some(p) = params {
2475            self.execute_with_params(query, p)
2476        } else {
2477            self.execute(query)
2478        };
2479        *self.viewing_epoch_override.lock() = previous;
2480        result
2481    }
2482
2483    /// Executes a GQL query with parameters.
2484    ///
2485    /// # Errors
2486    ///
2487    /// Returns an error if the query fails to parse or execute.
2488    #[cfg(feature = "gql")]
2489    pub fn execute_with_params(
2490        &self,
2491        query: &str,
2492        params: std::collections::HashMap<String, Value>,
2493    ) -> Result<QueryResult> {
2494        self.require_lpg("GQL")?;
2495
2496        use crate::query::processor::{QueryLanguage, QueryProcessor};
2497
2498        let has_mutations = Self::query_looks_like_mutation(query);
2499        let active = self.active_store();
2500
2501        self.with_auto_commit(has_mutations, || {
2502            // Get transaction context for MVCC visibility
2503            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2504
2505            // Create processor with transaction context
2506            let processor = QueryProcessor::for_stores_with_transaction(
2507                Arc::clone(&active),
2508                self.active_write_store(),
2509                Arc::clone(&self.transaction_manager),
2510            )?;
2511
2512            // Apply transaction context if in a transaction
2513            let processor = if let Some(transaction_id) = transaction_id {
2514                processor.with_transaction_context(viewing_epoch, transaction_id)
2515            } else {
2516                processor
2517            };
2518
2519            processor.process(query, QueryLanguage::Gql, Some(&params))
2520        })
2521    }
2522
2523    /// Executes a GQL query with parameters.
2524    ///
2525    /// # Errors
2526    ///
2527    /// Returns an error if no query language is enabled.
2528    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2529    pub fn execute_with_params(
2530        &self,
2531        _query: &str,
2532        _params: std::collections::HashMap<String, Value>,
2533    ) -> Result<QueryResult> {
2534        Err(grafeo_common::utils::error::Error::Internal(
2535            "No query language enabled".to_string(),
2536        ))
2537    }
2538
2539    /// Executes a GQL query.
2540    ///
2541    /// # Errors
2542    ///
2543    /// Returns an error if no query language is enabled.
2544    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2545    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2546        Err(grafeo_common::utils::error::Error::Internal(
2547            "No query language enabled".to_string(),
2548        ))
2549    }
2550
2551    /// Executes a Cypher query.
2552    ///
2553    /// # Errors
2554    ///
2555    /// Returns an error if the query fails to parse or execute.
2556    #[cfg(feature = "cypher")]
2557    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2558        use crate::query::{
2559            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2560            processor::QueryLanguage, translators::cypher,
2561        };
2562        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2563
2564        // Handle schema DDL and SHOW commands before the normal query path
2565        let translation = cypher::translate_full(query)?;
2566        match translation {
2567            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2568                if *self.read_only_tx.lock() {
2569                    return Err(GrafeoError::Query(QueryError::new(
2570                        QueryErrorKind::Semantic,
2571                        "Cannot execute schema DDL in a read-only transaction",
2572                    )));
2573                }
2574                return self.execute_schema_command(cmd);
2575            }
2576            cypher::CypherTranslationResult::ShowIndexes => {
2577                return self.execute_show_indexes();
2578            }
2579            cypher::CypherTranslationResult::ShowConstraints => {
2580                return self.execute_show_constraints();
2581            }
2582            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2583                return self.execute_show_current_graph_type();
2584            }
2585            cypher::CypherTranslationResult::Plan(_) => {
2586                // Fall through to normal execution below
2587            }
2588        }
2589
2590        #[cfg(not(target_arch = "wasm32"))]
2591        let start_time = std::time::Instant::now();
2592
2593        // Create cache key for this query
2594        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2595
2596        // Try to get cached optimized plan
2597        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2598            cached_plan
2599        } else {
2600            // Parse and translate the query to a logical plan
2601            let logical_plan = cypher::translate(query)?;
2602
2603            // Semantic validation
2604            let mut binder = Binder::new();
2605            let _binding_context = binder.bind(&logical_plan)?;
2606
2607            // Optimize the plan
2608            let active = self.active_store();
2609            let optimizer = Optimizer::from_graph_store(&*active);
2610            let plan = optimizer.optimize(logical_plan)?;
2611
2612            // Cache the optimized plan
2613            self.query_cache.put_optimized(cache_key, plan.clone());
2614
2615            plan
2616        };
2617
2618        // Resolve the active store for query execution
2619        let active = self.active_store();
2620
2621        // EXPLAIN
2622        if optimized_plan.explain {
2623            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2624            let mut plan = optimized_plan;
2625            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2626            return Ok(explain_result(&plan));
2627        }
2628
2629        // PROFILE
2630        if optimized_plan.profile {
2631            let has_mutations = optimized_plan.root.has_mutations();
2632            return self.with_auto_commit(has_mutations, || {
2633                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2634                let planner = self.create_planner_for_store(
2635                    Arc::clone(&active),
2636                    viewing_epoch,
2637                    transaction_id,
2638                );
2639                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2640
2641                let executor = Executor::with_columns(physical_plan.columns.clone())
2642                    .with_deadline(self.query_deadline());
2643                let _result = executor.execute(physical_plan.operator.as_mut())?;
2644
2645                let total_time_ms;
2646                #[cfg(not(target_arch = "wasm32"))]
2647                {
2648                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2649                }
2650                #[cfg(target_arch = "wasm32")]
2651                {
2652                    total_time_ms = 0.0;
2653                }
2654
2655                let profile_tree = crate::query::profile::build_profile_tree(
2656                    &optimized_plan.root,
2657                    &mut entries.into_iter(),
2658                );
2659                Ok(crate::query::profile::profile_result(
2660                    &profile_tree,
2661                    total_time_ms,
2662                ))
2663            });
2664        }
2665
2666        let has_mutations = optimized_plan.root.has_mutations();
2667
2668        let result = self.with_auto_commit(has_mutations, || {
2669            // Get transaction context for MVCC visibility
2670            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2671
2672            // Convert to physical plan with transaction context
2673            let planner =
2674                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2675            let mut physical_plan = planner.plan(&optimized_plan)?;
2676
2677            // Execute the plan
2678            let executor = Executor::with_columns(physical_plan.columns.clone())
2679                .with_deadline(self.query_deadline());
2680            executor.execute(physical_plan.operator.as_mut())
2681        });
2682
2683        #[cfg(feature = "metrics")]
2684        {
2685            #[cfg(not(target_arch = "wasm32"))]
2686            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2687            #[cfg(target_arch = "wasm32")]
2688            let elapsed_ms = None;
2689            self.record_query_metrics("cypher", elapsed_ms, &result);
2690        }
2691
2692        result
2693    }
2694
2695    /// Executes a Gremlin query.
2696    ///
2697    /// # Errors
2698    ///
2699    /// Returns an error if the query fails to parse or execute.
2700    ///
2701    /// # Examples
2702    ///
2703    /// ```no_run
2704    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2705    /// use grafeo_engine::GrafeoDB;
2706    ///
2707    /// let db = GrafeoDB::new_in_memory();
2708    /// let session = db.session();
2709    ///
2710    /// // Create some nodes first
2711    /// session.create_node(&["Person"]);
2712    ///
2713    /// // Query using Gremlin
2714    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2715    /// # Ok(())
2716    /// # }
2717    /// ```
2718    #[cfg(feature = "gremlin")]
2719    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2720        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2721
2722        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2723        let start_time = Instant::now();
2724
2725        // Parse and translate the query to a logical plan
2726        let logical_plan = gremlin::translate(query)?;
2727
2728        // Semantic validation
2729        let mut binder = Binder::new();
2730        let _binding_context = binder.bind(&logical_plan)?;
2731
2732        // Optimize the plan
2733        let active = self.active_store();
2734        let optimizer = Optimizer::from_graph_store(&*active);
2735        let optimized_plan = optimizer.optimize(logical_plan)?;
2736
2737        let has_mutations = optimized_plan.root.has_mutations();
2738
2739        let result = self.with_auto_commit(has_mutations, || {
2740            // Get transaction context for MVCC visibility
2741            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2742
2743            // Convert to physical plan with transaction context
2744            let planner =
2745                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2746            let mut physical_plan = planner.plan(&optimized_plan)?;
2747
2748            // Execute the plan
2749            let executor = Executor::with_columns(physical_plan.columns.clone())
2750                .with_deadline(self.query_deadline());
2751            executor.execute(physical_plan.operator.as_mut())
2752        });
2753
2754        #[cfg(feature = "metrics")]
2755        {
2756            #[cfg(not(target_arch = "wasm32"))]
2757            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2758            #[cfg(target_arch = "wasm32")]
2759            let elapsed_ms = None;
2760            self.record_query_metrics("gremlin", elapsed_ms, &result);
2761        }
2762
2763        result
2764    }
2765
2766    /// Executes a Gremlin query with parameters.
2767    ///
2768    /// # Errors
2769    ///
2770    /// Returns an error if the query fails to parse or execute.
2771    #[cfg(feature = "gremlin")]
2772    pub fn execute_gremlin_with_params(
2773        &self,
2774        query: &str,
2775        params: std::collections::HashMap<String, Value>,
2776    ) -> Result<QueryResult> {
2777        use crate::query::processor::{QueryLanguage, QueryProcessor};
2778
2779        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2780        let start_time = Instant::now();
2781
2782        let has_mutations = Self::query_looks_like_mutation(query);
2783        let active = self.active_store();
2784
2785        let result = self.with_auto_commit(has_mutations, || {
2786            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2787            let processor = QueryProcessor::for_stores_with_transaction(
2788                Arc::clone(&active),
2789                self.active_write_store(),
2790                Arc::clone(&self.transaction_manager),
2791            )?;
2792            let processor = if let Some(transaction_id) = transaction_id {
2793                processor.with_transaction_context(viewing_epoch, transaction_id)
2794            } else {
2795                processor
2796            };
2797            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2798        });
2799
2800        #[cfg(feature = "metrics")]
2801        {
2802            #[cfg(not(target_arch = "wasm32"))]
2803            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2804            #[cfg(target_arch = "wasm32")]
2805            let elapsed_ms = None;
2806            self.record_query_metrics("gremlin", elapsed_ms, &result);
2807        }
2808
2809        result
2810    }
2811
2812    /// Executes a GraphQL query against the LPG store.
2813    ///
2814    /// # Errors
2815    ///
2816    /// Returns an error if the query fails to parse or execute.
2817    ///
2818    /// # Examples
2819    ///
2820    /// ```no_run
2821    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2822    /// use grafeo_engine::GrafeoDB;
2823    ///
2824    /// let db = GrafeoDB::new_in_memory();
2825    /// let session = db.session();
2826    ///
2827    /// // Create some nodes first
2828    /// session.create_node(&["User"]);
2829    ///
2830    /// // Query using GraphQL
2831    /// let result = session.execute_graphql("query { user { id name } }")?;
2832    /// # Ok(())
2833    /// # }
2834    /// ```
2835    #[cfg(feature = "graphql")]
2836    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2837        use crate::query::{
2838            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2839            translators::graphql,
2840        };
2841
2842        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2843        let start_time = Instant::now();
2844
2845        let mut logical_plan = graphql::translate(query)?;
2846
2847        // Substitute default parameter values from variable declarations
2848        if !logical_plan.default_params.is_empty() {
2849            let defaults = logical_plan.default_params.clone();
2850            substitute_params(&mut logical_plan, &defaults)?;
2851        }
2852
2853        let mut binder = Binder::new();
2854        let _binding_context = binder.bind(&logical_plan)?;
2855
2856        let active = self.active_store();
2857        let optimizer = Optimizer::from_graph_store(&*active);
2858        let optimized_plan = optimizer.optimize(logical_plan)?;
2859        let has_mutations = optimized_plan.root.has_mutations();
2860
2861        let result = self.with_auto_commit(has_mutations, || {
2862            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2863            let planner =
2864                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2865            let mut physical_plan = planner.plan(&optimized_plan)?;
2866            let executor = Executor::with_columns(physical_plan.columns.clone())
2867                .with_deadline(self.query_deadline());
2868            executor.execute(physical_plan.operator.as_mut())
2869        });
2870
2871        #[cfg(feature = "metrics")]
2872        {
2873            #[cfg(not(target_arch = "wasm32"))]
2874            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2875            #[cfg(target_arch = "wasm32")]
2876            let elapsed_ms = None;
2877            self.record_query_metrics("graphql", elapsed_ms, &result);
2878        }
2879
2880        result
2881    }
2882
2883    /// Executes a GraphQL query with parameters.
2884    ///
2885    /// # Errors
2886    ///
2887    /// Returns an error if the query fails to parse or execute.
2888    #[cfg(feature = "graphql")]
2889    pub fn execute_graphql_with_params(
2890        &self,
2891        query: &str,
2892        params: std::collections::HashMap<String, Value>,
2893    ) -> Result<QueryResult> {
2894        use crate::query::processor::{QueryLanguage, QueryProcessor};
2895
2896        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2897        let start_time = Instant::now();
2898
2899        let has_mutations = Self::query_looks_like_mutation(query);
2900        let active = self.active_store();
2901
2902        let result = self.with_auto_commit(has_mutations, || {
2903            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2904            let processor = QueryProcessor::for_stores_with_transaction(
2905                Arc::clone(&active),
2906                self.active_write_store(),
2907                Arc::clone(&self.transaction_manager),
2908            )?;
2909            let processor = if let Some(transaction_id) = transaction_id {
2910                processor.with_transaction_context(viewing_epoch, transaction_id)
2911            } else {
2912                processor
2913            };
2914            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2915        });
2916
2917        #[cfg(feature = "metrics")]
2918        {
2919            #[cfg(not(target_arch = "wasm32"))]
2920            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2921            #[cfg(target_arch = "wasm32")]
2922            let elapsed_ms = None;
2923            self.record_query_metrics("graphql", elapsed_ms, &result);
2924        }
2925
2926        result
2927    }
2928
2929    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2930    ///
2931    /// # Errors
2932    ///
2933    /// Returns an error if the query fails to parse or execute.
2934    ///
2935    /// # Examples
2936    ///
2937    /// ```no_run
2938    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2939    /// use grafeo_engine::GrafeoDB;
2940    ///
2941    /// let db = GrafeoDB::new_in_memory();
2942    /// let session = db.session();
2943    ///
2944    /// let result = session.execute_sql(
2945    ///     "SELECT * FROM GRAPH_TABLE (
2946    ///         MATCH (n:Person)
2947    ///         COLUMNS (n.name AS name)
2948    ///     )"
2949    /// )?;
2950    /// # Ok(())
2951    /// # }
2952    /// ```
2953    #[cfg(feature = "sql-pgq")]
2954    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2955        use crate::query::{
2956            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2957            processor::QueryLanguage, translators::sql_pgq,
2958        };
2959
2960        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2961        let start_time = Instant::now();
2962
2963        // Parse and translate (always needed to check for DDL)
2964        let logical_plan = sql_pgq::translate(query)?;
2965
2966        // Handle DDL statements directly (they don't go through the query pipeline)
2967        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2968            return Ok(QueryResult {
2969                columns: vec!["status".into()],
2970                column_types: vec![grafeo_common::types::LogicalType::String],
2971                rows: vec![vec![Value::from(format!(
2972                    "Property graph '{}' created ({} node tables, {} edge tables)",
2973                    cpg.name,
2974                    cpg.node_tables.len(),
2975                    cpg.edge_tables.len()
2976                ))]],
2977                execution_time_ms: None,
2978                rows_scanned: None,
2979                status_message: None,
2980                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2981            });
2982        }
2983
2984        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2985
2986        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2987            cached_plan
2988        } else {
2989            let mut binder = Binder::new();
2990            let _binding_context = binder.bind(&logical_plan)?;
2991            let active = self.active_store();
2992            let optimizer = Optimizer::from_graph_store(&*active);
2993            let plan = optimizer.optimize(logical_plan)?;
2994            self.query_cache.put_optimized(cache_key, plan.clone());
2995            plan
2996        };
2997
2998        let active = self.active_store();
2999        let has_mutations = optimized_plan.root.has_mutations();
3000
3001        let result = self.with_auto_commit(has_mutations, || {
3002            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3003            let planner =
3004                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3005            let mut physical_plan = planner.plan(&optimized_plan)?;
3006            let executor = Executor::with_columns(physical_plan.columns.clone())
3007                .with_deadline(self.query_deadline());
3008            executor.execute(physical_plan.operator.as_mut())
3009        });
3010
3011        #[cfg(feature = "metrics")]
3012        {
3013            #[cfg(not(target_arch = "wasm32"))]
3014            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3015            #[cfg(target_arch = "wasm32")]
3016            let elapsed_ms = None;
3017            self.record_query_metrics("sql", elapsed_ms, &result);
3018        }
3019
3020        result
3021    }
3022
3023    /// Executes a SQL/PGQ query with parameters.
3024    ///
3025    /// # Errors
3026    ///
3027    /// Returns an error if the query fails to parse or execute.
3028    #[cfg(feature = "sql-pgq")]
3029    pub fn execute_sql_with_params(
3030        &self,
3031        query: &str,
3032        params: std::collections::HashMap<String, Value>,
3033    ) -> Result<QueryResult> {
3034        use crate::query::processor::{QueryLanguage, QueryProcessor};
3035
3036        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3037        let start_time = Instant::now();
3038
3039        let has_mutations = Self::query_looks_like_mutation(query);
3040        let active = self.active_store();
3041
3042        let result = self.with_auto_commit(has_mutations, || {
3043            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3044            let processor = QueryProcessor::for_stores_with_transaction(
3045                Arc::clone(&active),
3046                self.active_write_store(),
3047                Arc::clone(&self.transaction_manager),
3048            )?;
3049            let processor = if let Some(transaction_id) = transaction_id {
3050                processor.with_transaction_context(viewing_epoch, transaction_id)
3051            } else {
3052                processor
3053            };
3054            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3055        });
3056
3057        #[cfg(feature = "metrics")]
3058        {
3059            #[cfg(not(target_arch = "wasm32"))]
3060            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3061            #[cfg(target_arch = "wasm32")]
3062            let elapsed_ms = None;
3063            self.record_query_metrics("sql", elapsed_ms, &result);
3064        }
3065
3066        result
3067    }
3068
3069    /// Executes a query in the specified language by name.
3070    ///
3071    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3072    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3073    ///
3074    /// # Errors
3075    ///
3076    /// Returns an error if the language is unknown/disabled or the query fails.
3077    pub fn execute_language(
3078        &self,
3079        query: &str,
3080        language: &str,
3081        params: Option<std::collections::HashMap<String, Value>>,
3082    ) -> Result<QueryResult> {
3083        let _span = grafeo_info_span!(
3084            "grafeo::session::execute",
3085            language,
3086            query_len = query.len(),
3087        );
3088        match language {
3089            "gql" => {
3090                if let Some(p) = params {
3091                    self.execute_with_params(query, p)
3092                } else {
3093                    self.execute(query)
3094                }
3095            }
3096            #[cfg(feature = "cypher")]
3097            "cypher" => {
3098                if let Some(p) = params {
3099                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3100
3101                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3102                    let start_time = Instant::now();
3103
3104                    let has_mutations = Self::query_looks_like_mutation(query);
3105                    let active = self.active_store();
3106                    let result = self.with_auto_commit(has_mutations, || {
3107                        let processor = QueryProcessor::for_stores_with_transaction(
3108                            Arc::clone(&active),
3109                            self.active_write_store(),
3110                            Arc::clone(&self.transaction_manager),
3111                        )?;
3112                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3113                        let processor = if let Some(transaction_id) = transaction_id {
3114                            processor.with_transaction_context(viewing_epoch, transaction_id)
3115                        } else {
3116                            processor
3117                        };
3118                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3119                    });
3120
3121                    #[cfg(feature = "metrics")]
3122                    {
3123                        #[cfg(not(target_arch = "wasm32"))]
3124                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3125                        #[cfg(target_arch = "wasm32")]
3126                        let elapsed_ms = None;
3127                        self.record_query_metrics("cypher", elapsed_ms, &result);
3128                    }
3129
3130                    result
3131                } else {
3132                    self.execute_cypher(query)
3133                }
3134            }
3135            #[cfg(feature = "gremlin")]
3136            "gremlin" => {
3137                if let Some(p) = params {
3138                    self.execute_gremlin_with_params(query, p)
3139                } else {
3140                    self.execute_gremlin(query)
3141                }
3142            }
3143            #[cfg(feature = "graphql")]
3144            "graphql" => {
3145                if let Some(p) = params {
3146                    self.execute_graphql_with_params(query, p)
3147                } else {
3148                    self.execute_graphql(query)
3149                }
3150            }
3151            #[cfg(all(feature = "graphql", feature = "rdf"))]
3152            "graphql-rdf" => {
3153                if let Some(p) = params {
3154                    self.execute_graphql_rdf_with_params(query, p)
3155                } else {
3156                    self.execute_graphql_rdf(query)
3157                }
3158            }
3159            #[cfg(feature = "sql-pgq")]
3160            "sql" | "sql-pgq" => {
3161                if let Some(p) = params {
3162                    self.execute_sql_with_params(query, p)
3163                } else {
3164                    self.execute_sql(query)
3165                }
3166            }
3167            #[cfg(all(feature = "sparql", feature = "rdf"))]
3168            "sparql" => {
3169                if let Some(p) = params {
3170                    self.execute_sparql_with_params(query, p)
3171                } else {
3172                    self.execute_sparql(query)
3173                }
3174            }
3175            other => Err(grafeo_common::utils::error::Error::Query(
3176                grafeo_common::utils::error::QueryError::new(
3177                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3178                    format!("Unknown query language: '{other}'"),
3179                ),
3180            )),
3181        }
3182    }
3183
3184    /// Begins a new transaction.
3185    ///
3186    /// # Errors
3187    ///
3188    /// Returns an error if a transaction is already active.
3189    ///
3190    /// # Examples
3191    ///
3192    /// ```no_run
3193    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3194    /// use grafeo_engine::GrafeoDB;
3195    ///
3196    /// let db = GrafeoDB::new_in_memory();
3197    /// let mut session = db.session();
3198    ///
3199    /// session.begin_transaction()?;
3200    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3201    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3202    /// session.commit()?; // Both inserts committed atomically
3203    /// # Ok(())
3204    /// # }
3205    /// ```
3206    /// Clears all cached query plans.
3207    ///
3208    /// The plan cache is shared across all sessions on the same database,
3209    /// so clearing from one session affects all sessions.
3210    pub fn clear_plan_cache(&self) {
3211        self.query_cache.clear();
3212    }
3213
3214    /// Begins a new transaction on this session.
3215    ///
3216    /// Uses the default isolation level (`SnapshotIsolation`).
3217    ///
3218    /// # Errors
3219    ///
3220    /// Returns an error if a transaction is already active.
3221    pub fn begin_transaction(&mut self) -> Result<()> {
3222        self.begin_transaction_inner(false, None)
3223    }
3224
3225    /// Begins a transaction with a specific isolation level.
3226    ///
3227    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3228    ///
3229    /// # Errors
3230    ///
3231    /// Returns an error if a transaction is already active.
3232    pub fn begin_transaction_with_isolation(
3233        &mut self,
3234        isolation_level: crate::transaction::IsolationLevel,
3235    ) -> Result<()> {
3236        self.begin_transaction_inner(false, Some(isolation_level))
3237    }
3238
3239    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3240    fn begin_transaction_inner(
3241        &self,
3242        read_only: bool,
3243        isolation_level: Option<crate::transaction::IsolationLevel>,
3244    ) -> Result<()> {
3245        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3246        let mut current = self.current_transaction.lock();
3247        if current.is_some() {
3248            // Nested transaction: create an auto-savepoint instead of a new tx.
3249            drop(current);
3250            let mut depth = self.transaction_nesting_depth.lock();
3251            *depth += 1;
3252            let sp_name = format!("_nested_tx_{}", *depth);
3253            self.savepoint(&sp_name)?;
3254            return Ok(());
3255        }
3256
3257        let active = self.active_lpg_store();
3258        self.transaction_start_node_count
3259            .store(active.node_count(), Ordering::Relaxed);
3260        self.transaction_start_edge_count
3261            .store(active.edge_count(), Ordering::Relaxed);
3262        let transaction_id = if let Some(level) = isolation_level {
3263            self.transaction_manager.begin_with_isolation(level)
3264        } else {
3265            self.transaction_manager.begin()
3266        };
3267        *current = Some(transaction_id);
3268        *self.read_only_tx.lock() = read_only || self.db_read_only;
3269
3270        // Record the initial graph as "touched" for cross-graph atomicity.
3271        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3272        let key = self.active_graph_storage_key();
3273        let mut touched = self.touched_graphs.lock();
3274        touched.clear();
3275        touched.push(key);
3276
3277        #[cfg(feature = "metrics")]
3278        {
3279            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3280            #[cfg(not(target_arch = "wasm32"))]
3281            {
3282                *self.tx_start_time.lock() = Some(Instant::now());
3283            }
3284        }
3285
3286        Ok(())
3287    }
3288
3289    /// Commits the current transaction.
3290    ///
3291    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3292    ///
3293    /// # Errors
3294    ///
3295    /// Returns an error if no transaction is active.
3296    pub fn commit(&mut self) -> Result<()> {
3297        self.commit_inner()
3298    }
3299
3300    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3301    fn commit_inner(&self) -> Result<()> {
3302        let _span = grafeo_debug_span!("grafeo::tx::commit");
3303        // Nested transaction: release the auto-savepoint (changes are preserved).
3304        {
3305            let mut depth = self.transaction_nesting_depth.lock();
3306            if *depth > 0 {
3307                let sp_name = format!("_nested_tx_{depth}");
3308                *depth -= 1;
3309                drop(depth);
3310                return self.release_savepoint(&sp_name);
3311            }
3312        }
3313
3314        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3315            grafeo_common::utils::error::Error::Transaction(
3316                grafeo_common::utils::error::TransactionError::InvalidState(
3317                    "No active transaction".to_string(),
3318                ),
3319            )
3320        })?;
3321
3322        // Validate the transaction first (conflict detection) before committing data.
3323        // If this fails, we rollback the data changes instead of making them permanent.
3324        let touched = self.touched_graphs.lock().clone();
3325        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3326            Ok(epoch) => epoch,
3327            Err(e) => {
3328                // Conflict detected: rollback the data changes
3329                for graph_name in &touched {
3330                    let store = self.resolve_store(graph_name);
3331                    store.rollback_transaction_properties(transaction_id);
3332                }
3333                #[cfg(feature = "rdf")]
3334                self.rollback_rdf_transaction(transaction_id);
3335                // Discard buffered CDC events on conflict rollback
3336                #[cfg(feature = "cdc")]
3337                if let Some(ref pending) = self.cdc_pending_events {
3338                    pending.lock().clear();
3339                }
3340                *self.read_only_tx.lock() = self.db_read_only;
3341                self.savepoints.lock().clear();
3342                self.touched_graphs.lock().clear();
3343                #[cfg(feature = "metrics")]
3344                {
3345                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3346                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3347                    #[cfg(not(target_arch = "wasm32"))]
3348                    if let Some(start) = self.tx_start_time.lock().take() {
3349                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3350                        crate::metrics::record_metric!(
3351                            self.metrics,
3352                            tx_duration,
3353                            observe duration_ms
3354                        );
3355                    }
3356                }
3357                return Err(e);
3358            }
3359        };
3360
3361        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3362        for graph_name in &touched {
3363            let store = self.resolve_store(graph_name);
3364            store.finalize_version_epochs(transaction_id, commit_epoch);
3365        }
3366
3367        // Commit succeeded: discard undo logs (make changes permanent)
3368        #[cfg(feature = "rdf")]
3369        self.commit_rdf_transaction(transaction_id);
3370
3371        for graph_name in &touched {
3372            let store = self.resolve_store(graph_name);
3373            store.commit_transaction_properties(transaction_id);
3374        }
3375
3376        // Flush buffered CDC events now that the transaction is committed.
3377        // All buffered events have PENDING epoch; assign the real commit_epoch.
3378        // Uses record_batch to acquire the write lock once per commit.
3379        #[cfg(feature = "cdc")]
3380        if let Some(ref pending) = self.cdc_pending_events {
3381            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3382            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3383                e.epoch = commit_epoch;
3384                e
3385            }));
3386        }
3387
3388        // Sync epoch for all touched graphs so that convenience lookups
3389        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3390        let current_epoch = self.transaction_manager.current_epoch();
3391        for graph_name in &touched {
3392            let store = self.resolve_store(graph_name);
3393            store.sync_epoch(current_epoch);
3394        }
3395
3396        // Reset read-only flag, clear savepoints and touched graphs
3397        *self.read_only_tx.lock() = self.db_read_only;
3398        self.savepoints.lock().clear();
3399        self.touched_graphs.lock().clear();
3400
3401        // Auto-GC: periodically prune old MVCC versions
3402        if self.gc_interval > 0 {
3403            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3404            if count.is_multiple_of(self.gc_interval) {
3405                let min_epoch = self.transaction_manager.min_active_epoch();
3406                for graph_name in &touched {
3407                    let store = self.resolve_store(graph_name);
3408                    store.gc_versions(min_epoch);
3409                }
3410                self.transaction_manager.gc();
3411                #[cfg(feature = "metrics")]
3412                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3413            }
3414        }
3415
3416        #[cfg(feature = "metrics")]
3417        {
3418            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3419            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3420            #[cfg(not(target_arch = "wasm32"))]
3421            if let Some(start) = self.tx_start_time.lock().take() {
3422                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3423                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3424            }
3425        }
3426
3427        Ok(())
3428    }
3429
3430    /// Aborts the current transaction.
3431    ///
3432    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3433    ///
3434    /// # Errors
3435    ///
3436    /// Returns an error if no transaction is active.
3437    ///
3438    /// # Examples
3439    ///
3440    /// ```no_run
3441    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3442    /// use grafeo_engine::GrafeoDB;
3443    ///
3444    /// let db = GrafeoDB::new_in_memory();
3445    /// let mut session = db.session();
3446    ///
3447    /// session.begin_transaction()?;
3448    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3449    /// session.rollback()?; // Insert is discarded
3450    /// # Ok(())
3451    /// # }
3452    /// ```
3453    pub fn rollback(&mut self) -> Result<()> {
3454        self.rollback_inner()
3455    }
3456
3457    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3458    fn rollback_inner(&self) -> Result<()> {
3459        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3460        // Nested transaction: rollback to the auto-savepoint.
3461        {
3462            let mut depth = self.transaction_nesting_depth.lock();
3463            if *depth > 0 {
3464                let sp_name = format!("_nested_tx_{depth}");
3465                *depth -= 1;
3466                drop(depth);
3467                return self.rollback_to_savepoint(&sp_name);
3468            }
3469        }
3470
3471        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3472            grafeo_common::utils::error::Error::Transaction(
3473                grafeo_common::utils::error::TransactionError::InvalidState(
3474                    "No active transaction".to_string(),
3475                ),
3476            )
3477        })?;
3478
3479        // Reset read-only flag
3480        *self.read_only_tx.lock() = self.db_read_only;
3481
3482        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3483        let touched = self.touched_graphs.lock().clone();
3484        for graph_name in &touched {
3485            let store = self.resolve_store(graph_name);
3486            store.discard_uncommitted_versions(transaction_id);
3487        }
3488
3489        // Discard pending operations in the RDF store
3490        #[cfg(feature = "rdf")]
3491        self.rollback_rdf_transaction(transaction_id);
3492
3493        // Discard buffered CDC events on rollback
3494        #[cfg(feature = "cdc")]
3495        if let Some(ref pending) = self.cdc_pending_events {
3496            pending.lock().clear();
3497        }
3498
3499        // Clear savepoints and touched graphs
3500        self.savepoints.lock().clear();
3501        self.touched_graphs.lock().clear();
3502
3503        // Mark transaction as aborted in the manager
3504        let result = self.transaction_manager.abort(transaction_id);
3505
3506        #[cfg(feature = "metrics")]
3507        if result.is_ok() {
3508            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3509            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3510            #[cfg(not(target_arch = "wasm32"))]
3511            if let Some(start) = self.tx_start_time.lock().take() {
3512                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3513                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3514            }
3515        }
3516
3517        result
3518    }
3519
3520    /// Creates a named savepoint within the current transaction.
3521    ///
3522    /// The savepoint captures the current node/edge ID counters so that
3523    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3524    /// entities created after this point.
3525    ///
3526    /// # Errors
3527    ///
3528    /// Returns an error if no transaction is active.
3529    pub fn savepoint(&self, name: &str) -> Result<()> {
3530        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3531            grafeo_common::utils::error::Error::Transaction(
3532                grafeo_common::utils::error::TransactionError::InvalidState(
3533                    "No active transaction".to_string(),
3534                ),
3535            )
3536        })?;
3537
3538        // Capture state for every graph touched so far.
3539        let touched = self.touched_graphs.lock().clone();
3540        let graph_snapshots: Vec<GraphSavepoint> = touched
3541            .iter()
3542            .map(|graph_name| {
3543                let store = self.resolve_store(graph_name);
3544                GraphSavepoint {
3545                    graph_name: graph_name.clone(),
3546                    next_node_id: store.peek_next_node_id(),
3547                    next_edge_id: store.peek_next_edge_id(),
3548                    undo_log_position: store.property_undo_log_position(tx_id),
3549                }
3550            })
3551            .collect();
3552
3553        self.savepoints.lock().push(SavepointState {
3554            name: name.to_string(),
3555            graph_snapshots,
3556            active_graph: self.current_graph.lock().clone(),
3557            #[cfg(feature = "cdc")]
3558            cdc_event_position: self
3559                .cdc_pending_events
3560                .as_ref()
3561                .map_or(0, |p| p.lock().len()),
3562        });
3563        Ok(())
3564    }
3565
3566    /// Rolls back to a named savepoint, undoing all writes made after it.
3567    ///
3568    /// The savepoint and any savepoints created after it are removed.
3569    /// Entities with IDs >= the savepoint snapshot are discarded.
3570    ///
3571    /// # Errors
3572    ///
3573    /// Returns an error if no transaction is active or the savepoint does not exist.
3574    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3575        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3576            grafeo_common::utils::error::Error::Transaction(
3577                grafeo_common::utils::error::TransactionError::InvalidState(
3578                    "No active transaction".to_string(),
3579                ),
3580            )
3581        })?;
3582
3583        let mut savepoints = self.savepoints.lock();
3584
3585        // Find the savepoint by name (search from the end for nested savepoints)
3586        let pos = savepoints
3587            .iter()
3588            .rposition(|sp| sp.name == name)
3589            .ok_or_else(|| {
3590                grafeo_common::utils::error::Error::Transaction(
3591                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3592                        "Savepoint '{name}' not found"
3593                    )),
3594                )
3595            })?;
3596
3597        let sp_state = savepoints[pos].clone();
3598
3599        // Remove this savepoint and all later ones
3600        savepoints.truncate(pos);
3601        drop(savepoints);
3602
3603        // Roll back each graph that was captured in the savepoint.
3604        for gs in &sp_state.graph_snapshots {
3605            let store = self.resolve_store(&gs.graph_name);
3606
3607            // Replay property/label undo entries recorded after the savepoint
3608            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3609
3610            // Discard entities created after the savepoint
3611            let current_next_node = store.peek_next_node_id();
3612            let current_next_edge = store.peek_next_edge_id();
3613
3614            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3615                .map(NodeId::new)
3616                .collect();
3617            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3618                .map(EdgeId::new)
3619                .collect();
3620
3621            if !node_ids.is_empty() || !edge_ids.is_empty() {
3622                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3623            }
3624        }
3625
3626        // Also roll back any graphs that were touched AFTER the savepoint
3627        // but not captured in it. These need full discard since the savepoint
3628        // didn't include them.
3629        let touched = self.touched_graphs.lock().clone();
3630        for graph_name in &touched {
3631            let already_captured = sp_state
3632                .graph_snapshots
3633                .iter()
3634                .any(|gs| gs.graph_name == *graph_name);
3635            if !already_captured {
3636                let store = self.resolve_store(graph_name);
3637                store.discard_uncommitted_versions(transaction_id);
3638            }
3639        }
3640
3641        // Truncate CDC event buffer to the savepoint position.
3642        #[cfg(feature = "cdc")]
3643        if let Some(ref pending) = self.cdc_pending_events {
3644            pending.lock().truncate(sp_state.cdc_event_position);
3645        }
3646
3647        // Restore touched_graphs to only the graphs that were known at savepoint time.
3648        let mut touched = self.touched_graphs.lock();
3649        touched.clear();
3650        for gs in &sp_state.graph_snapshots {
3651            if !touched.contains(&gs.graph_name) {
3652                touched.push(gs.graph_name.clone());
3653            }
3654        }
3655
3656        Ok(())
3657    }
3658
3659    /// Releases (removes) a named savepoint without rolling back.
3660    ///
3661    /// # Errors
3662    ///
3663    /// Returns an error if no transaction is active or the savepoint does not exist.
3664    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3665        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3666            grafeo_common::utils::error::Error::Transaction(
3667                grafeo_common::utils::error::TransactionError::InvalidState(
3668                    "No active transaction".to_string(),
3669                ),
3670            )
3671        })?;
3672
3673        let mut savepoints = self.savepoints.lock();
3674        let pos = savepoints
3675            .iter()
3676            .rposition(|sp| sp.name == name)
3677            .ok_or_else(|| {
3678                grafeo_common::utils::error::Error::Transaction(
3679                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3680                        "Savepoint '{name}' not found"
3681                    )),
3682                )
3683            })?;
3684        savepoints.remove(pos);
3685        Ok(())
3686    }
3687
3688    /// Returns whether a transaction is active.
3689    #[must_use]
3690    pub fn in_transaction(&self) -> bool {
3691        self.current_transaction.lock().is_some()
3692    }
3693
3694    /// Returns the current transaction ID, if any.
3695    #[must_use]
3696    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3697        *self.current_transaction.lock()
3698    }
3699
3700    /// Returns a reference to the transaction manager.
3701    #[must_use]
3702    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3703        &self.transaction_manager
3704    }
3705
3706    /// Returns the store's current node count and the count at transaction start.
3707    #[must_use]
3708    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3709        (
3710            self.transaction_start_node_count.load(Ordering::Relaxed),
3711            self.active_lpg_store().node_count(),
3712        )
3713    }
3714
3715    /// Returns the store's current edge count and the count at transaction start.
3716    #[must_use]
3717    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3718        (
3719            self.transaction_start_edge_count.load(Ordering::Relaxed),
3720            self.active_lpg_store().edge_count(),
3721        )
3722    }
3723
3724    /// Prepares the current transaction for a two-phase commit.
3725    ///
3726    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3727    /// lets you inspect pending changes and attach metadata before finalizing.
3728    /// The mutable borrow prevents concurrent operations while the commit is
3729    /// pending.
3730    ///
3731    /// If the `PreparedCommit` is dropped without calling `commit()` or
3732    /// `abort()`, the transaction is automatically rolled back.
3733    ///
3734    /// # Errors
3735    ///
3736    /// Returns an error if no transaction is active.
3737    ///
3738    /// # Examples
3739    ///
3740    /// ```no_run
3741    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3742    /// use grafeo_engine::GrafeoDB;
3743    ///
3744    /// let db = GrafeoDB::new_in_memory();
3745    /// let mut session = db.session();
3746    ///
3747    /// session.begin_transaction()?;
3748    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3749    ///
3750    /// let mut prepared = session.prepare_commit()?;
3751    /// println!("Nodes written: {}", prepared.info().nodes_written);
3752    /// prepared.set_metadata("audit_user", "admin");
3753    /// prepared.commit()?;
3754    /// # Ok(())
3755    /// # }
3756    /// ```
3757    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3758        crate::transaction::PreparedCommit::new(self)
3759    }
3760
3761    /// Sets auto-commit mode.
3762    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3763        self.auto_commit = auto_commit;
3764    }
3765
3766    /// Returns whether auto-commit is enabled.
3767    #[must_use]
3768    pub fn auto_commit(&self) -> bool {
3769        self.auto_commit
3770    }
3771
3772    /// Returns `true` if auto-commit should wrap this execution.
3773    ///
3774    /// Auto-commit kicks in when: the session is in auto-commit mode,
3775    /// no explicit transaction is active, and the query mutates data.
3776    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3777        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3778    }
3779
3780    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3781    /// returns `true`. On error the transaction is rolled back.
3782    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3783    where
3784        F: FnOnce() -> Result<QueryResult>,
3785    {
3786        if self.needs_auto_commit(has_mutations) {
3787            self.begin_transaction_inner(false, None)?;
3788            match body() {
3789                Ok(result) => {
3790                    self.commit_inner()?;
3791                    Ok(result)
3792                }
3793                Err(e) => {
3794                    let _ = self.rollback_inner();
3795                    Err(e)
3796                }
3797            }
3798        } else {
3799            body()
3800        }
3801    }
3802
3803    /// Quick heuristic: returns `true` when the query text looks like it
3804    /// performs a mutation. Used by `_with_params` paths that go through the
3805    /// `QueryProcessor` (where the logical plan isn't available before
3806    /// execution). False negatives are harmless: the data just won't be
3807    /// auto-committed, which matches the prior behaviour.
3808    fn query_looks_like_mutation(query: &str) -> bool {
3809        let upper = query.to_ascii_uppercase();
3810        upper.contains("INSERT")
3811            || upper.contains("CREATE")
3812            || upper.contains("DELETE")
3813            || upper.contains("MERGE")
3814            || upper.contains("SET")
3815            || upper.contains("REMOVE")
3816            || upper.contains("DROP")
3817            || upper.contains("ALTER")
3818    }
3819
3820    /// Computes the wall-clock deadline for query execution.
3821    #[must_use]
3822    fn query_deadline(&self) -> Option<Instant> {
3823        #[cfg(not(target_arch = "wasm32"))]
3824        {
3825            self.query_timeout.map(|d| Instant::now() + d)
3826        }
3827        #[cfg(target_arch = "wasm32")]
3828        {
3829            let _ = &self.query_timeout;
3830            None
3831        }
3832    }
3833
3834    /// Records query metrics for any language.
3835    ///
3836    /// Called after query execution to update counters, latency histogram,
3837    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3838    /// where `Instant` is unavailable.
3839    #[cfg(feature = "metrics")]
3840    fn record_query_metrics(
3841        &self,
3842        language: &str,
3843        elapsed_ms: Option<f64>,
3844        result: &Result<crate::database::QueryResult>,
3845    ) {
3846        use crate::metrics::record_metric;
3847
3848        record_metric!(self.metrics, query_count, inc);
3849        if let Some(ref reg) = self.metrics {
3850            reg.query_count_by_language.increment(language);
3851        }
3852        if let Some(ms) = elapsed_ms {
3853            record_metric!(self.metrics, query_latency, observe ms);
3854        }
3855        match result {
3856            Ok(r) => {
3857                let returned = r.rows.len() as u64;
3858                record_metric!(self.metrics, rows_returned, add returned);
3859                if let Some(scanned) = r.rows_scanned {
3860                    record_metric!(self.metrics, rows_scanned, add scanned);
3861                }
3862            }
3863            Err(e) => {
3864                record_metric!(self.metrics, query_errors, inc);
3865                // Detect timeout errors
3866                let msg = e.to_string();
3867                if msg.contains("exceeded timeout") {
3868                    record_metric!(self.metrics, query_timeouts, inc);
3869                }
3870            }
3871        }
3872    }
3873
3874    /// Evaluates a simple integer literal from a session parameter expression.
3875    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3876        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3877        match expr {
3878            Expression::Literal(Literal::Integer(n)) => Some(*n),
3879            _ => None,
3880        }
3881    }
3882
3883    /// Returns the current transaction context for MVCC visibility.
3884    ///
3885    /// Returns `(viewing_epoch, transaction_id)` where:
3886    /// - `viewing_epoch` is the epoch at which to check version visibility
3887    /// - `transaction_id` is the current transaction ID (if in a transaction)
3888    #[must_use]
3889    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3890        // Time-travel override takes precedence (read-only, no tx context)
3891        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3892            return (epoch, None);
3893        }
3894
3895        if let Some(transaction_id) = *self.current_transaction.lock() {
3896            // In a transaction: use the transaction's start epoch
3897            let epoch = self
3898                .transaction_manager
3899                .start_epoch(transaction_id)
3900                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3901            (epoch, Some(transaction_id))
3902        } else {
3903            // No transaction: use current epoch
3904            (self.transaction_manager.current_epoch(), None)
3905        }
3906    }
3907
3908    /// Creates a planner with transaction context and constraint validator.
3909    ///
3910    /// The `store` parameter is the graph store to plan against (use
3911    /// `self.active_store()` for graph-aware execution).
3912    fn create_planner_for_store(
3913        &self,
3914        store: Arc<dyn GraphStore>,
3915        viewing_epoch: EpochId,
3916        transaction_id: Option<TransactionId>,
3917    ) -> crate::query::Planner {
3918        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3919    }
3920
3921    fn create_planner_for_store_with_read_only(
3922        &self,
3923        store: Arc<dyn GraphStore>,
3924        viewing_epoch: EpochId,
3925        transaction_id: Option<TransactionId>,
3926        read_only: bool,
3927    ) -> crate::query::Planner {
3928        use crate::query::Planner;
3929        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3930
3931        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3932        let info_store = Arc::clone(&store);
3933        let schema_store = Arc::clone(&store);
3934
3935        let session_context = SessionContext {
3936            current_schema: self.current_schema(),
3937            current_graph: self.current_graph(),
3938            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3939            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3940        };
3941
3942        let write_store = self.active_write_store();
3943
3944        let mut planner = Planner::with_context(
3945            Arc::clone(&store),
3946            write_store,
3947            Arc::clone(&self.transaction_manager),
3948            transaction_id,
3949            viewing_epoch,
3950        )
3951        .with_factorized_execution(self.factorized_execution)
3952        .with_catalog(Arc::clone(&self.catalog))
3953        .with_session_context(session_context)
3954        .with_read_only(read_only);
3955
3956        // Attach the constraint validator for schema enforcement
3957        let validator =
3958            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3959        planner = planner.with_validator(Arc::new(validator));
3960
3961        planner
3962    }
3963
3964    /// Builds a `Value::Map` for the `info()` introspection function.
3965    fn build_info_value(store: &dyn GraphStore) -> Value {
3966        use grafeo_common::types::PropertyKey;
3967        use std::collections::BTreeMap;
3968
3969        let mut map = BTreeMap::new();
3970        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3971        map.insert(
3972            PropertyKey::from("node_count"),
3973            Value::Int64(store.node_count() as i64),
3974        );
3975        map.insert(
3976            PropertyKey::from("edge_count"),
3977            Value::Int64(store.edge_count() as i64),
3978        );
3979        map.insert(
3980            PropertyKey::from("version"),
3981            Value::String(env!("CARGO_PKG_VERSION").into()),
3982        );
3983        Value::Map(map.into())
3984    }
3985
3986    /// Builds a `Value::Map` for the `schema()` introspection function.
3987    fn build_schema_value(store: &dyn GraphStore) -> Value {
3988        use grafeo_common::types::PropertyKey;
3989        use std::collections::BTreeMap;
3990
3991        let labels: Vec<Value> = store
3992            .all_labels()
3993            .into_iter()
3994            .map(|l| Value::String(l.into()))
3995            .collect();
3996        let edge_types: Vec<Value> = store
3997            .all_edge_types()
3998            .into_iter()
3999            .map(|t| Value::String(t.into()))
4000            .collect();
4001        let property_keys: Vec<Value> = store
4002            .all_property_keys()
4003            .into_iter()
4004            .map(|k| Value::String(k.into()))
4005            .collect();
4006
4007        let mut map = BTreeMap::new();
4008        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4009        map.insert(
4010            PropertyKey::from("edge_types"),
4011            Value::List(edge_types.into()),
4012        );
4013        map.insert(
4014            PropertyKey::from("property_keys"),
4015            Value::List(property_keys.into()),
4016        );
4017        Value::Map(map.into())
4018    }
4019
4020    /// Creates a node directly (bypassing query execution).
4021    ///
4022    /// This is a low-level API for testing and direct manipulation.
4023    /// If a transaction is active, the node will be versioned with the transaction ID.
4024    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4025        let (epoch, transaction_id) = self.get_transaction_context();
4026        self.active_lpg_store().create_node_versioned(
4027            labels,
4028            epoch,
4029            transaction_id.unwrap_or(TransactionId::SYSTEM),
4030        )
4031    }
4032
4033    /// Creates a node with properties.
4034    ///
4035    /// If a transaction is active, the node will be versioned with the transaction ID.
4036    pub fn create_node_with_props<'a>(
4037        &self,
4038        labels: &[&str],
4039        properties: impl IntoIterator<Item = (&'a str, Value)>,
4040    ) -> NodeId {
4041        let (epoch, transaction_id) = self.get_transaction_context();
4042        self.active_lpg_store().create_node_with_props_versioned(
4043            labels,
4044            properties,
4045            epoch,
4046            transaction_id.unwrap_or(TransactionId::SYSTEM),
4047        )
4048    }
4049
4050    /// Creates an edge between two nodes.
4051    ///
4052    /// This is a low-level API for testing and direct manipulation.
4053    /// If a transaction is active, the edge will be versioned with the transaction ID.
4054    pub fn create_edge(
4055        &self,
4056        src: NodeId,
4057        dst: NodeId,
4058        edge_type: &str,
4059    ) -> grafeo_common::types::EdgeId {
4060        let (epoch, transaction_id) = self.get_transaction_context();
4061        self.active_lpg_store().create_edge_versioned(
4062            src,
4063            dst,
4064            edge_type,
4065            epoch,
4066            transaction_id.unwrap_or(TransactionId::SYSTEM),
4067        )
4068    }
4069
4070    /// Creates an edge with properties within the active transaction context.
4071    pub fn create_edge_with_props<'a>(
4072        &self,
4073        src: NodeId,
4074        dst: NodeId,
4075        edge_type: &str,
4076        properties: impl IntoIterator<Item = (&'a str, Value)>,
4077    ) -> grafeo_common::types::EdgeId {
4078        let (epoch, transaction_id) = self.get_transaction_context();
4079        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4080        let store = self.active_lpg_store();
4081        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4082        for (key, value) in properties {
4083            store.set_edge_property_versioned(eid, key, value, tid);
4084        }
4085        eid
4086    }
4087
4088    /// Sets a node property within the active transaction context.
4089    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4090        let (_, transaction_id) = self.get_transaction_context();
4091        if let Some(tid) = transaction_id {
4092            self.active_lpg_store()
4093                .set_node_property_versioned(id, key, value, tid);
4094        } else {
4095            self.active_lpg_store().set_node_property(id, key, value);
4096        }
4097    }
4098
4099    /// Sets an edge property within the active transaction context.
4100    pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4101        let (_, transaction_id) = self.get_transaction_context();
4102        if let Some(tid) = transaction_id {
4103            self.active_lpg_store()
4104                .set_edge_property_versioned(id, key, value, tid);
4105        } else {
4106            self.active_lpg_store().set_edge_property(id, key, value);
4107        }
4108    }
4109
4110    /// Deletes a node within the active transaction context.
4111    pub fn delete_node(&self, id: NodeId) -> bool {
4112        let (epoch, transaction_id) = self.get_transaction_context();
4113        if let Some(tid) = transaction_id {
4114            self.active_lpg_store()
4115                .delete_node_versioned(id, epoch, tid)
4116        } else {
4117            self.active_lpg_store().delete_node(id)
4118        }
4119    }
4120
4121    /// Deletes an edge within the active transaction context.
4122    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4123        let (epoch, transaction_id) = self.get_transaction_context();
4124        if let Some(tid) = transaction_id {
4125            self.active_lpg_store()
4126                .delete_edge_versioned(id, epoch, tid)
4127        } else {
4128            self.active_lpg_store().delete_edge(id)
4129        }
4130    }
4131
4132    // =========================================================================
4133    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4134    // =========================================================================
4135
4136    /// Gets a node by ID directly, bypassing query planning.
4137    ///
4138    /// This is the fastest way to retrieve a single node when you know its ID.
4139    /// Skips parsing, binding, optimization, and physical planning entirely.
4140    ///
4141    /// # Performance
4142    ///
4143    /// - Time complexity: O(1) average case
4144    /// - No lock contention (uses DashMap internally)
4145    /// - ~20-30x faster than equivalent MATCH query
4146    ///
4147    /// # Example
4148    ///
4149    /// ```no_run
4150    /// # use grafeo_engine::GrafeoDB;
4151    /// # let db = GrafeoDB::new_in_memory();
4152    /// let session = db.session();
4153    /// let node_id = session.create_node(&["Person"]);
4154    ///
4155    /// // Direct lookup - O(1), no query planning
4156    /// let node = session.get_node(node_id);
4157    /// assert!(node.is_some());
4158    /// ```
4159    #[must_use]
4160    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4161        let (epoch, transaction_id) = self.get_transaction_context();
4162        self.active_lpg_store().get_node_versioned(
4163            id,
4164            epoch,
4165            transaction_id.unwrap_or(TransactionId::SYSTEM),
4166        )
4167    }
4168
4169    /// Gets a single property from a node by ID, bypassing query planning.
4170    ///
4171    /// More efficient than `get_node()` when you only need one property,
4172    /// as it avoids loading the full node with all properties.
4173    ///
4174    /// # Performance
4175    ///
4176    /// - Time complexity: O(1) average case
4177    /// - No query planning overhead
4178    ///
4179    /// # Example
4180    ///
4181    /// ```no_run
4182    /// # use grafeo_engine::GrafeoDB;
4183    /// # use grafeo_common::types::Value;
4184    /// # let db = GrafeoDB::new_in_memory();
4185    /// let session = db.session();
4186    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4187    ///
4188    /// // Direct property access - O(1)
4189    /// let name = session.get_node_property(id, "name");
4190    /// assert_eq!(name, Some(Value::String("Alix".into())));
4191    /// ```
4192    #[must_use]
4193    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4194        self.get_node(id)
4195            .and_then(|node| node.get_property(key).cloned())
4196    }
4197
4198    /// Gets an edge by ID directly, bypassing query planning.
4199    ///
4200    /// # Performance
4201    ///
4202    /// - Time complexity: O(1) average case
4203    /// - No lock contention
4204    #[must_use]
4205    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4206        let (epoch, transaction_id) = self.get_transaction_context();
4207        self.active_lpg_store().get_edge_versioned(
4208            id,
4209            epoch,
4210            transaction_id.unwrap_or(TransactionId::SYSTEM),
4211        )
4212    }
4213
4214    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4215    ///
4216    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4217    ///
4218    /// # Performance
4219    ///
4220    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4221    /// - Uses adjacency index for direct access
4222    /// - ~10-20x faster than equivalent MATCH query
4223    ///
4224    /// # Example
4225    ///
4226    /// ```no_run
4227    /// # use grafeo_engine::GrafeoDB;
4228    /// # let db = GrafeoDB::new_in_memory();
4229    /// let session = db.session();
4230    /// let alix = session.create_node(&["Person"]);
4231    /// let gus = session.create_node(&["Person"]);
4232    /// session.create_edge(alix, gus, "KNOWS");
4233    ///
4234    /// // Direct neighbor lookup - O(degree)
4235    /// let neighbors = session.get_neighbors_outgoing(alix);
4236    /// assert_eq!(neighbors.len(), 1);
4237    /// assert_eq!(neighbors[0].0, gus);
4238    /// ```
4239    #[must_use]
4240    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4241        self.active_lpg_store()
4242            .edges_from(node, Direction::Outgoing)
4243            .collect()
4244    }
4245
4246    /// Gets incoming neighbors of a node directly, bypassing query planning.
4247    ///
4248    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4249    ///
4250    /// # Performance
4251    ///
4252    /// - Time complexity: O(degree) where degree is the number of incoming edges
4253    /// - Uses backward adjacency index for direct access
4254    #[must_use]
4255    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4256        self.active_lpg_store()
4257            .edges_from(node, Direction::Incoming)
4258            .collect()
4259    }
4260
4261    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4262    ///
4263    /// # Example
4264    ///
4265    /// ```no_run
4266    /// # use grafeo_engine::GrafeoDB;
4267    /// # let db = GrafeoDB::new_in_memory();
4268    /// # let session = db.session();
4269    /// # let alix = session.create_node(&["Person"]);
4270    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4271    /// ```
4272    #[must_use]
4273    pub fn get_neighbors_outgoing_by_type(
4274        &self,
4275        node: NodeId,
4276        edge_type: &str,
4277    ) -> Vec<(NodeId, EdgeId)> {
4278        self.active_lpg_store()
4279            .edges_from(node, Direction::Outgoing)
4280            .filter(|(_, edge_id)| {
4281                self.get_edge(*edge_id)
4282                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4283            })
4284            .collect()
4285    }
4286
4287    /// Checks if a node exists, bypassing query planning.
4288    ///
4289    /// # Performance
4290    ///
4291    /// - Time complexity: O(1)
4292    /// - Fastest existence check available
4293    #[must_use]
4294    pub fn node_exists(&self, id: NodeId) -> bool {
4295        self.get_node(id).is_some()
4296    }
4297
4298    /// Checks if an edge exists, bypassing query planning.
4299    #[must_use]
4300    pub fn edge_exists(&self, id: EdgeId) -> bool {
4301        self.get_edge(id).is_some()
4302    }
4303
4304    /// Gets the degree (number of edges) of a node.
4305    ///
4306    /// Returns (outgoing_degree, incoming_degree).
4307    #[must_use]
4308    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4309        let active = self.active_lpg_store();
4310        let out = active.out_degree(node);
4311        let in_degree = active.in_degree(node);
4312        (out, in_degree)
4313    }
4314
4315    /// Batch lookup of multiple nodes by ID.
4316    ///
4317    /// More efficient than calling `get_node()` in a loop because it
4318    /// amortizes overhead.
4319    ///
4320    /// # Performance
4321    ///
4322    /// - Time complexity: O(n) where n is the number of IDs
4323    /// - Better cache utilization than individual lookups
4324    #[must_use]
4325    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4326        let (epoch, transaction_id) = self.get_transaction_context();
4327        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4328        let active = self.active_lpg_store();
4329        ids.iter()
4330            .map(|&id| active.get_node_versioned(id, epoch, tx))
4331            .collect()
4332    }
4333
4334    // ── Change Data Capture ─────────────────────────────────────────────
4335
4336    /// Returns the full change history for an entity (node or edge).
4337    #[cfg(feature = "cdc")]
4338    pub fn history(
4339        &self,
4340        entity_id: impl Into<crate::cdc::EntityId>,
4341    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4342        Ok(self.cdc_log.history(entity_id.into()))
4343    }
4344
4345    /// Returns change events for an entity since the given epoch.
4346    #[cfg(feature = "cdc")]
4347    pub fn history_since(
4348        &self,
4349        entity_id: impl Into<crate::cdc::EntityId>,
4350        since_epoch: EpochId,
4351    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4352        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4353    }
4354
4355    /// Returns all change events across all entities in an epoch range.
4356    #[cfg(feature = "cdc")]
4357    pub fn changes_between(
4358        &self,
4359        start_epoch: EpochId,
4360        end_epoch: EpochId,
4361    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4362        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4363    }
4364}
4365
4366impl Drop for Session {
4367    fn drop(&mut self) {
4368        // Auto-rollback any active transaction to prevent leaked MVCC state,
4369        // dangling write locks, and uncommitted versions lingering in the store.
4370        if self.in_transaction() {
4371            let _ = self.rollback_inner();
4372        }
4373
4374        #[cfg(feature = "metrics")]
4375        if let Some(ref reg) = self.metrics {
4376            reg.session_active
4377                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4378        }
4379    }
4380}
4381
4382#[cfg(test)]
4383mod tests {
4384    use super::parse_default_literal;
4385    use crate::database::GrafeoDB;
4386    use grafeo_common::types::Value;
4387
4388    // -----------------------------------------------------------------------
4389    // parse_default_literal
4390    // -----------------------------------------------------------------------
4391
4392    #[test]
4393    fn parse_default_literal_null() {
4394        assert_eq!(parse_default_literal("null"), Value::Null);
4395        assert_eq!(parse_default_literal("NULL"), Value::Null);
4396        assert_eq!(parse_default_literal("Null"), Value::Null);
4397    }
4398
4399    #[test]
4400    fn parse_default_literal_bool() {
4401        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4402        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4403        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4404        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4405    }
4406
4407    #[test]
4408    fn parse_default_literal_string_single_quoted() {
4409        assert_eq!(
4410            parse_default_literal("'hello'"),
4411            Value::String("hello".into())
4412        );
4413    }
4414
4415    #[test]
4416    fn parse_default_literal_string_double_quoted() {
4417        assert_eq!(
4418            parse_default_literal("\"world\""),
4419            Value::String("world".into())
4420        );
4421    }
4422
4423    #[test]
4424    fn parse_default_literal_integer() {
4425        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4426        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4427        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4428    }
4429
4430    #[test]
4431    fn parse_default_literal_float() {
4432        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4433        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4434    }
4435
4436    #[test]
4437    fn parse_default_literal_fallback_string() {
4438        // Not a recognized literal, not quoted, not a number
4439        assert_eq!(
4440            parse_default_literal("some_identifier"),
4441            Value::String("some_identifier".into())
4442        );
4443    }
4444
4445    #[test]
4446    fn test_session_create_node() {
4447        let db = GrafeoDB::new_in_memory();
4448        let session = db.session();
4449
4450        let id = session.create_node(&["Person"]);
4451        assert!(id.is_valid());
4452        assert_eq!(db.node_count(), 1);
4453    }
4454
4455    #[test]
4456    fn test_session_transaction() {
4457        let db = GrafeoDB::new_in_memory();
4458        let mut session = db.session();
4459
4460        assert!(!session.in_transaction());
4461
4462        session.begin_transaction().unwrap();
4463        assert!(session.in_transaction());
4464
4465        session.commit().unwrap();
4466        assert!(!session.in_transaction());
4467    }
4468
4469    #[test]
4470    fn test_session_transaction_context() {
4471        let db = GrafeoDB::new_in_memory();
4472        let mut session = db.session();
4473
4474        // Without transaction - context should have current epoch and no transaction_id
4475        let (_epoch1, transaction_id1) = session.get_transaction_context();
4476        assert!(transaction_id1.is_none());
4477
4478        // Start a transaction
4479        session.begin_transaction().unwrap();
4480        let (epoch2, transaction_id2) = session.get_transaction_context();
4481        assert!(transaction_id2.is_some());
4482        // Transaction should have a valid epoch
4483        let _ = epoch2; // Use the variable
4484
4485        // Commit and verify
4486        session.commit().unwrap();
4487        let (epoch3, tx_id3) = session.get_transaction_context();
4488        assert!(tx_id3.is_none());
4489        // Epoch should have advanced after commit
4490        assert!(epoch3.as_u64() >= epoch2.as_u64());
4491    }
4492
4493    #[test]
4494    fn test_session_rollback() {
4495        let db = GrafeoDB::new_in_memory();
4496        let mut session = db.session();
4497
4498        session.begin_transaction().unwrap();
4499        session.rollback().unwrap();
4500        assert!(!session.in_transaction());
4501    }
4502
4503    #[test]
4504    fn test_session_rollback_discards_versions() {
4505        use grafeo_common::types::TransactionId;
4506
4507        let db = GrafeoDB::new_in_memory();
4508
4509        // Create a node outside of any transaction (at system level)
4510        let node_before = db.store().create_node(&["Person"]);
4511        assert!(node_before.is_valid());
4512        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4513
4514        // Start a transaction
4515        let mut session = db.session();
4516        session.begin_transaction().unwrap();
4517        let transaction_id = session.current_transaction.lock().unwrap();
4518
4519        // Create a node versioned with the transaction's ID
4520        let epoch = db.store().current_epoch();
4521        let node_in_tx = db
4522            .store()
4523            .create_node_versioned(&["Person"], epoch, transaction_id);
4524        assert!(node_in_tx.is_valid());
4525
4526        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4527        // non-versioned lookups like node_count(). Verify the node is visible
4528        // only through the owning transaction.
4529        assert_eq!(
4530            db.node_count(),
4531            1,
4532            "PENDING nodes should be invisible to non-versioned node_count()"
4533        );
4534        assert!(
4535            db.store()
4536                .get_node_versioned(node_in_tx, epoch, transaction_id)
4537                .is_some(),
4538            "Transaction node should be visible to its own transaction"
4539        );
4540
4541        // Rollback the transaction
4542        session.rollback().unwrap();
4543        assert!(!session.in_transaction());
4544
4545        // The node created in the transaction should be discarded
4546        // Only the first node should remain visible
4547        let count_after = db.node_count();
4548        assert_eq!(
4549            count_after, 1,
4550            "Rollback should discard uncommitted node, but got {count_after}"
4551        );
4552
4553        // The original node should still be accessible
4554        let current_epoch = db.store().current_epoch();
4555        assert!(
4556            db.store()
4557                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4558                .is_some(),
4559            "Original node should still exist"
4560        );
4561
4562        // The node created in the transaction should not be accessible
4563        assert!(
4564            db.store()
4565                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4566                .is_none(),
4567            "Transaction node should be gone"
4568        );
4569    }
4570
4571    #[test]
4572    fn test_session_create_node_in_transaction() {
4573        // Test that session.create_node() is transaction-aware
4574        let db = GrafeoDB::new_in_memory();
4575
4576        // Create a node outside of any transaction
4577        let node_before = db.create_node(&["Person"]);
4578        assert!(node_before.is_valid());
4579        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4580
4581        // Start a transaction and create a node through the session
4582        let mut session = db.session();
4583        session.begin_transaction().unwrap();
4584        let transaction_id = session.current_transaction.lock().unwrap();
4585
4586        // Create a node through session.create_node() - should be versioned with tx
4587        let node_in_tx = session.create_node(&["Person"]);
4588        assert!(node_in_tx.is_valid());
4589
4590        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4591        // non-versioned lookups. Verify the node is visible only to its own tx.
4592        assert_eq!(
4593            db.node_count(),
4594            1,
4595            "PENDING nodes should be invisible to non-versioned node_count()"
4596        );
4597        let epoch = db.store().current_epoch();
4598        assert!(
4599            db.store()
4600                .get_node_versioned(node_in_tx, epoch, transaction_id)
4601                .is_some(),
4602            "Transaction node should be visible to its own transaction"
4603        );
4604
4605        // Rollback the transaction
4606        session.rollback().unwrap();
4607
4608        // The node created via session.create_node() should be discarded
4609        let count_after = db.node_count();
4610        assert_eq!(
4611            count_after, 1,
4612            "Rollback should discard node created via session.create_node(), but got {count_after}"
4613        );
4614    }
4615
4616    #[test]
4617    fn test_session_create_node_with_props_in_transaction() {
4618        use grafeo_common::types::Value;
4619
4620        // Test that session.create_node_with_props() is transaction-aware
4621        let db = GrafeoDB::new_in_memory();
4622
4623        // Create a node outside of any transaction
4624        db.create_node(&["Person"]);
4625        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4626
4627        // Start a transaction and create a node with properties
4628        let mut session = db.session();
4629        session.begin_transaction().unwrap();
4630        let transaction_id = session.current_transaction.lock().unwrap();
4631
4632        let node_in_tx =
4633            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4634        assert!(node_in_tx.is_valid());
4635
4636        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4637        // non-versioned lookups. Verify the node is visible only to its own tx.
4638        assert_eq!(
4639            db.node_count(),
4640            1,
4641            "PENDING nodes should be invisible to non-versioned node_count()"
4642        );
4643        let epoch = db.store().current_epoch();
4644        assert!(
4645            db.store()
4646                .get_node_versioned(node_in_tx, epoch, transaction_id)
4647                .is_some(),
4648            "Transaction node should be visible to its own transaction"
4649        );
4650
4651        // Rollback the transaction
4652        session.rollback().unwrap();
4653
4654        // The node should be discarded
4655        let count_after = db.node_count();
4656        assert_eq!(
4657            count_after, 1,
4658            "Rollback should discard node created via session.create_node_with_props()"
4659        );
4660    }
4661
4662    #[cfg(feature = "gql")]
4663    mod gql_tests {
4664        use super::*;
4665
4666        #[test]
4667        fn test_gql_query_execution() {
4668            let db = GrafeoDB::new_in_memory();
4669            let session = db.session();
4670
4671            // Create some test data
4672            session.create_node(&["Person"]);
4673            session.create_node(&["Person"]);
4674            session.create_node(&["Animal"]);
4675
4676            // Execute a GQL query
4677            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4678
4679            // Should return 2 Person nodes
4680            assert_eq!(result.row_count(), 2);
4681            assert_eq!(result.column_count(), 1);
4682            assert_eq!(result.columns[0], "n");
4683        }
4684
4685        #[test]
4686        fn test_gql_empty_result() {
4687            let db = GrafeoDB::new_in_memory();
4688            let session = db.session();
4689
4690            // No data in database
4691            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4692
4693            assert_eq!(result.row_count(), 0);
4694        }
4695
4696        #[test]
4697        fn test_gql_parse_error() {
4698            let db = GrafeoDB::new_in_memory();
4699            let session = db.session();
4700
4701            // Invalid GQL syntax
4702            let result = session.execute("MATCH (n RETURN n");
4703
4704            assert!(result.is_err());
4705        }
4706
4707        #[test]
4708        fn test_gql_relationship_traversal() {
4709            let db = GrafeoDB::new_in_memory();
4710            let session = db.session();
4711
4712            // Create a graph: Alix -> Gus, Alix -> Vincent
4713            let alix = session.create_node(&["Person"]);
4714            let gus = session.create_node(&["Person"]);
4715            let vincent = session.create_node(&["Person"]);
4716
4717            session.create_edge(alix, gus, "KNOWS");
4718            session.create_edge(alix, vincent, "KNOWS");
4719
4720            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4721            let result = session
4722                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4723                .unwrap();
4724
4725            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4726            assert_eq!(result.row_count(), 2);
4727            assert_eq!(result.column_count(), 2);
4728            assert_eq!(result.columns[0], "a");
4729            assert_eq!(result.columns[1], "b");
4730        }
4731
4732        #[test]
4733        fn test_gql_relationship_with_type_filter() {
4734            let db = GrafeoDB::new_in_memory();
4735            let session = db.session();
4736
4737            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4738            let alix = session.create_node(&["Person"]);
4739            let gus = session.create_node(&["Person"]);
4740            let vincent = session.create_node(&["Person"]);
4741
4742            session.create_edge(alix, gus, "KNOWS");
4743            session.create_edge(alix, vincent, "WORKS_WITH");
4744
4745            // Query only KNOWS relationships
4746            let result = session
4747                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4748                .unwrap();
4749
4750            // Should return only 1 row (Alix->Gus)
4751            assert_eq!(result.row_count(), 1);
4752        }
4753
4754        #[test]
4755        fn test_gql_semantic_error_undefined_variable() {
4756            let db = GrafeoDB::new_in_memory();
4757            let session = db.session();
4758
4759            // Reference undefined variable 'x' in RETURN
4760            let result = session.execute("MATCH (n:Person) RETURN x");
4761
4762            // Should fail with semantic error
4763            assert!(result.is_err());
4764            let Err(err) = result else {
4765                panic!("Expected error")
4766            };
4767            assert!(
4768                err.to_string().contains("Undefined variable"),
4769                "Expected undefined variable error, got: {}",
4770                err
4771            );
4772        }
4773
4774        #[test]
4775        fn test_gql_where_clause_property_filter() {
4776            use grafeo_common::types::Value;
4777
4778            let db = GrafeoDB::new_in_memory();
4779            let session = db.session();
4780
4781            // Create people with ages
4782            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4783            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4784            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4785
4786            // Query with WHERE clause: age > 30
4787            let result = session
4788                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4789                .unwrap();
4790
4791            // Should return 2 people (ages 35 and 45)
4792            assert_eq!(result.row_count(), 2);
4793        }
4794
4795        #[test]
4796        fn test_gql_where_clause_equality() {
4797            use grafeo_common::types::Value;
4798
4799            let db = GrafeoDB::new_in_memory();
4800            let session = db.session();
4801
4802            // Create people with names
4803            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4804            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4805            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4806
4807            // Query with WHERE clause: name = "Alix"
4808            let result = session
4809                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4810                .unwrap();
4811
4812            // Should return 2 people named Alix
4813            assert_eq!(result.row_count(), 2);
4814        }
4815
4816        #[test]
4817        fn test_gql_return_property_access() {
4818            use grafeo_common::types::Value;
4819
4820            let db = GrafeoDB::new_in_memory();
4821            let session = db.session();
4822
4823            // Create people with names and ages
4824            session.create_node_with_props(
4825                &["Person"],
4826                [
4827                    ("name", Value::String("Alix".into())),
4828                    ("age", Value::Int64(30)),
4829                ],
4830            );
4831            session.create_node_with_props(
4832                &["Person"],
4833                [
4834                    ("name", Value::String("Gus".into())),
4835                    ("age", Value::Int64(25)),
4836                ],
4837            );
4838
4839            // Query returning properties
4840            let result = session
4841                .execute("MATCH (n:Person) RETURN n.name, n.age")
4842                .unwrap();
4843
4844            // Should return 2 rows with name and age columns
4845            assert_eq!(result.row_count(), 2);
4846            assert_eq!(result.column_count(), 2);
4847            assert_eq!(result.columns[0], "n.name");
4848            assert_eq!(result.columns[1], "n.age");
4849
4850            // Check that we get actual values
4851            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4852            assert!(names.contains(&&Value::String("Alix".into())));
4853            assert!(names.contains(&&Value::String("Gus".into())));
4854        }
4855
4856        #[test]
4857        fn test_gql_return_mixed_expressions() {
4858            use grafeo_common::types::Value;
4859
4860            let db = GrafeoDB::new_in_memory();
4861            let session = db.session();
4862
4863            // Create a person
4864            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4865
4866            // Query returning both node and property
4867            let result = session
4868                .execute("MATCH (n:Person) RETURN n, n.name")
4869                .unwrap();
4870
4871            assert_eq!(result.row_count(), 1);
4872            assert_eq!(result.column_count(), 2);
4873            assert_eq!(result.columns[0], "n");
4874            assert_eq!(result.columns[1], "n.name");
4875
4876            // Second column should be the name
4877            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4878        }
4879    }
4880
4881    #[cfg(feature = "cypher")]
4882    mod cypher_tests {
4883        use super::*;
4884
4885        #[test]
4886        fn test_cypher_query_execution() {
4887            let db = GrafeoDB::new_in_memory();
4888            let session = db.session();
4889
4890            // Create some test data
4891            session.create_node(&["Person"]);
4892            session.create_node(&["Person"]);
4893            session.create_node(&["Animal"]);
4894
4895            // Execute a Cypher query
4896            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4897
4898            // Should return 2 Person nodes
4899            assert_eq!(result.row_count(), 2);
4900            assert_eq!(result.column_count(), 1);
4901            assert_eq!(result.columns[0], "n");
4902        }
4903
4904        #[test]
4905        fn test_cypher_empty_result() {
4906            let db = GrafeoDB::new_in_memory();
4907            let session = db.session();
4908
4909            // No data in database
4910            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4911
4912            assert_eq!(result.row_count(), 0);
4913        }
4914
4915        #[test]
4916        fn test_cypher_parse_error() {
4917            let db = GrafeoDB::new_in_memory();
4918            let session = db.session();
4919
4920            // Invalid Cypher syntax
4921            let result = session.execute_cypher("MATCH (n RETURN n");
4922
4923            assert!(result.is_err());
4924        }
4925    }
4926
4927    // ==================== Direct Lookup API Tests ====================
4928
4929    mod direct_lookup_tests {
4930        use super::*;
4931        use grafeo_common::types::Value;
4932
4933        #[test]
4934        fn test_get_node() {
4935            let db = GrafeoDB::new_in_memory();
4936            let session = db.session();
4937
4938            let id = session.create_node(&["Person"]);
4939            let node = session.get_node(id);
4940
4941            assert!(node.is_some());
4942            let node = node.unwrap();
4943            assert_eq!(node.id, id);
4944        }
4945
4946        #[test]
4947        fn test_get_node_not_found() {
4948            use grafeo_common::types::NodeId;
4949
4950            let db = GrafeoDB::new_in_memory();
4951            let session = db.session();
4952
4953            // Try to get a non-existent node
4954            let node = session.get_node(NodeId::new(9999));
4955            assert!(node.is_none());
4956        }
4957
4958        #[test]
4959        fn test_get_node_property() {
4960            let db = GrafeoDB::new_in_memory();
4961            let session = db.session();
4962
4963            let id = session
4964                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4965
4966            let name = session.get_node_property(id, "name");
4967            assert_eq!(name, Some(Value::String("Alix".into())));
4968
4969            // Non-existent property
4970            let missing = session.get_node_property(id, "missing");
4971            assert!(missing.is_none());
4972        }
4973
4974        #[test]
4975        fn test_get_edge() {
4976            let db = GrafeoDB::new_in_memory();
4977            let session = db.session();
4978
4979            let alix = session.create_node(&["Person"]);
4980            let gus = session.create_node(&["Person"]);
4981            let edge_id = session.create_edge(alix, gus, "KNOWS");
4982
4983            let edge = session.get_edge(edge_id);
4984            assert!(edge.is_some());
4985            let edge = edge.unwrap();
4986            assert_eq!(edge.id, edge_id);
4987            assert_eq!(edge.src, alix);
4988            assert_eq!(edge.dst, gus);
4989        }
4990
4991        #[test]
4992        fn test_get_edge_not_found() {
4993            use grafeo_common::types::EdgeId;
4994
4995            let db = GrafeoDB::new_in_memory();
4996            let session = db.session();
4997
4998            let edge = session.get_edge(EdgeId::new(9999));
4999            assert!(edge.is_none());
5000        }
5001
5002        #[test]
5003        fn test_get_neighbors_outgoing() {
5004            let db = GrafeoDB::new_in_memory();
5005            let session = db.session();
5006
5007            let alix = session.create_node(&["Person"]);
5008            let gus = session.create_node(&["Person"]);
5009            let harm = session.create_node(&["Person"]);
5010
5011            session.create_edge(alix, gus, "KNOWS");
5012            session.create_edge(alix, harm, "KNOWS");
5013
5014            let neighbors = session.get_neighbors_outgoing(alix);
5015            assert_eq!(neighbors.len(), 2);
5016
5017            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5018            assert!(neighbor_ids.contains(&gus));
5019            assert!(neighbor_ids.contains(&harm));
5020        }
5021
5022        #[test]
5023        fn test_get_neighbors_incoming() {
5024            let db = GrafeoDB::new_in_memory();
5025            let session = db.session();
5026
5027            let alix = session.create_node(&["Person"]);
5028            let gus = session.create_node(&["Person"]);
5029            let harm = session.create_node(&["Person"]);
5030
5031            session.create_edge(gus, alix, "KNOWS");
5032            session.create_edge(harm, alix, "KNOWS");
5033
5034            let neighbors = session.get_neighbors_incoming(alix);
5035            assert_eq!(neighbors.len(), 2);
5036
5037            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5038            assert!(neighbor_ids.contains(&gus));
5039            assert!(neighbor_ids.contains(&harm));
5040        }
5041
5042        #[test]
5043        fn test_get_neighbors_outgoing_by_type() {
5044            let db = GrafeoDB::new_in_memory();
5045            let session = db.session();
5046
5047            let alix = session.create_node(&["Person"]);
5048            let gus = session.create_node(&["Person"]);
5049            let company = session.create_node(&["Company"]);
5050
5051            session.create_edge(alix, gus, "KNOWS");
5052            session.create_edge(alix, company, "WORKS_AT");
5053
5054            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5055            assert_eq!(knows_neighbors.len(), 1);
5056            assert_eq!(knows_neighbors[0].0, gus);
5057
5058            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5059            assert_eq!(works_neighbors.len(), 1);
5060            assert_eq!(works_neighbors[0].0, company);
5061
5062            // No edges of this type
5063            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5064            assert!(no_neighbors.is_empty());
5065        }
5066
5067        #[test]
5068        fn test_node_exists() {
5069            use grafeo_common::types::NodeId;
5070
5071            let db = GrafeoDB::new_in_memory();
5072            let session = db.session();
5073
5074            let id = session.create_node(&["Person"]);
5075
5076            assert!(session.node_exists(id));
5077            assert!(!session.node_exists(NodeId::new(9999)));
5078        }
5079
5080        #[test]
5081        fn test_edge_exists() {
5082            use grafeo_common::types::EdgeId;
5083
5084            let db = GrafeoDB::new_in_memory();
5085            let session = db.session();
5086
5087            let alix = session.create_node(&["Person"]);
5088            let gus = session.create_node(&["Person"]);
5089            let edge_id = session.create_edge(alix, gus, "KNOWS");
5090
5091            assert!(session.edge_exists(edge_id));
5092            assert!(!session.edge_exists(EdgeId::new(9999)));
5093        }
5094
5095        #[test]
5096        fn test_get_degree() {
5097            let db = GrafeoDB::new_in_memory();
5098            let session = db.session();
5099
5100            let alix = session.create_node(&["Person"]);
5101            let gus = session.create_node(&["Person"]);
5102            let harm = session.create_node(&["Person"]);
5103
5104            // Alix knows Gus and Harm (2 outgoing)
5105            session.create_edge(alix, gus, "KNOWS");
5106            session.create_edge(alix, harm, "KNOWS");
5107            // Gus knows Alix (1 incoming for Alix)
5108            session.create_edge(gus, alix, "KNOWS");
5109
5110            let (out_degree, in_degree) = session.get_degree(alix);
5111            assert_eq!(out_degree, 2);
5112            assert_eq!(in_degree, 1);
5113
5114            // Node with no edges
5115            let lonely = session.create_node(&["Person"]);
5116            let (out, in_deg) = session.get_degree(lonely);
5117            assert_eq!(out, 0);
5118            assert_eq!(in_deg, 0);
5119        }
5120
5121        #[test]
5122        fn test_get_nodes_batch() {
5123            let db = GrafeoDB::new_in_memory();
5124            let session = db.session();
5125
5126            let alix = session.create_node(&["Person"]);
5127            let gus = session.create_node(&["Person"]);
5128            let harm = session.create_node(&["Person"]);
5129
5130            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5131            assert_eq!(nodes.len(), 3);
5132            assert!(nodes[0].is_some());
5133            assert!(nodes[1].is_some());
5134            assert!(nodes[2].is_some());
5135
5136            // With non-existent node
5137            use grafeo_common::types::NodeId;
5138            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5139            assert_eq!(nodes_with_missing.len(), 3);
5140            assert!(nodes_with_missing[0].is_some());
5141            assert!(nodes_with_missing[1].is_none()); // Missing node
5142            assert!(nodes_with_missing[2].is_some());
5143        }
5144
5145        #[test]
5146        fn test_auto_commit_setting() {
5147            let db = GrafeoDB::new_in_memory();
5148            let mut session = db.session();
5149
5150            // Default is auto-commit enabled
5151            assert!(session.auto_commit());
5152
5153            session.set_auto_commit(false);
5154            assert!(!session.auto_commit());
5155
5156            session.set_auto_commit(true);
5157            assert!(session.auto_commit());
5158        }
5159
5160        #[test]
5161        fn test_transaction_double_begin_nests() {
5162            let db = GrafeoDB::new_in_memory();
5163            let mut session = db.session();
5164
5165            session.begin_transaction().unwrap();
5166            // Second begin_transaction creates a nested transaction (auto-savepoint)
5167            let result = session.begin_transaction();
5168            assert!(result.is_ok());
5169            // Commit the inner (releases savepoint)
5170            session.commit().unwrap();
5171            // Commit the outer
5172            session.commit().unwrap();
5173        }
5174
5175        #[test]
5176        fn test_commit_without_transaction_error() {
5177            let db = GrafeoDB::new_in_memory();
5178            let mut session = db.session();
5179
5180            let result = session.commit();
5181            assert!(result.is_err());
5182        }
5183
5184        #[test]
5185        fn test_rollback_without_transaction_error() {
5186            let db = GrafeoDB::new_in_memory();
5187            let mut session = db.session();
5188
5189            let result = session.rollback();
5190            assert!(result.is_err());
5191        }
5192
5193        #[test]
5194        fn test_create_edge_in_transaction() {
5195            let db = GrafeoDB::new_in_memory();
5196            let mut session = db.session();
5197
5198            // Create nodes outside transaction
5199            let alix = session.create_node(&["Person"]);
5200            let gus = session.create_node(&["Person"]);
5201
5202            // Create edge in transaction
5203            session.begin_transaction().unwrap();
5204            let edge_id = session.create_edge(alix, gus, "KNOWS");
5205
5206            // Edge should be visible in the transaction
5207            assert!(session.edge_exists(edge_id));
5208
5209            // Commit
5210            session.commit().unwrap();
5211
5212            // Edge should still be visible
5213            assert!(session.edge_exists(edge_id));
5214        }
5215
5216        #[test]
5217        fn test_neighbors_empty_node() {
5218            let db = GrafeoDB::new_in_memory();
5219            let session = db.session();
5220
5221            let lonely = session.create_node(&["Person"]);
5222
5223            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5224            assert!(session.get_neighbors_incoming(lonely).is_empty());
5225            assert!(
5226                session
5227                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5228                    .is_empty()
5229            );
5230        }
5231    }
5232
5233    #[test]
5234    fn test_auto_gc_triggers_on_commit_interval() {
5235        use crate::config::Config;
5236
5237        let config = Config::in_memory().with_gc_interval(2);
5238        let db = GrafeoDB::with_config(config).unwrap();
5239        let mut session = db.session();
5240
5241        // First commit: counter = 1, no GC (not a multiple of 2)
5242        session.begin_transaction().unwrap();
5243        session.create_node(&["A"]);
5244        session.commit().unwrap();
5245
5246        // Second commit: counter = 2, GC should trigger (multiple of 2)
5247        session.begin_transaction().unwrap();
5248        session.create_node(&["B"]);
5249        session.commit().unwrap();
5250
5251        // Verify the database is still functional after GC
5252        assert_eq!(db.node_count(), 2);
5253    }
5254
5255    #[test]
5256    fn test_query_timeout_config_propagates_to_session() {
5257        use crate::config::Config;
5258        use std::time::Duration;
5259
5260        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5261        let db = GrafeoDB::with_config(config).unwrap();
5262        let session = db.session();
5263
5264        // Verify the session has a query deadline (timeout was set)
5265        assert!(session.query_deadline().is_some());
5266    }
5267
5268    #[test]
5269    fn test_no_query_timeout_returns_no_deadline() {
5270        let db = GrafeoDB::new_in_memory();
5271        let session = db.session();
5272
5273        // Default config has no timeout
5274        assert!(session.query_deadline().is_none());
5275    }
5276
5277    #[test]
5278    fn test_graph_model_accessor() {
5279        use crate::config::GraphModel;
5280
5281        let db = GrafeoDB::new_in_memory();
5282        let session = db.session();
5283
5284        assert_eq!(session.graph_model(), GraphModel::Lpg);
5285    }
5286
5287    #[cfg(feature = "gql")]
5288    #[test]
5289    fn test_external_store_session() {
5290        use grafeo_core::graph::GraphStoreMut;
5291        use std::sync::Arc;
5292
5293        let config = crate::config::Config::in_memory();
5294        let store =
5295            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5296        let db = GrafeoDB::with_store(store, config).unwrap();
5297
5298        let mut session = db.session();
5299
5300        // Use an explicit transaction so that INSERT and MATCH share the same
5301        // transaction context. With PENDING epochs, uncommitted versions are
5302        // only visible to the owning transaction.
5303        session.begin_transaction().unwrap();
5304        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5305
5306        // Verify we can query through it within the same transaction
5307        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5308        assert_eq!(result.row_count(), 1);
5309
5310        session.commit().unwrap();
5311    }
5312
5313    // ==================== Session Command Tests ====================
5314
5315    #[cfg(feature = "gql")]
5316    mod session_command_tests {
5317        use super::*;
5318        use grafeo_common::types::Value;
5319
5320        #[test]
5321        fn test_use_graph_sets_current_graph() {
5322            let db = GrafeoDB::new_in_memory();
5323            let session = db.session();
5324
5325            // Create the graph first, then USE it
5326            session.execute("CREATE GRAPH mydb").unwrap();
5327            session.execute("USE GRAPH mydb").unwrap();
5328
5329            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5330        }
5331
5332        #[test]
5333        fn test_use_graph_nonexistent_errors() {
5334            let db = GrafeoDB::new_in_memory();
5335            let session = db.session();
5336
5337            let result = session.execute("USE GRAPH doesnotexist");
5338            assert!(result.is_err());
5339            let err = result.unwrap_err().to_string();
5340            assert!(
5341                err.contains("does not exist"),
5342                "Expected 'does not exist' error, got: {err}"
5343            );
5344        }
5345
5346        #[test]
5347        fn test_use_graph_default_always_valid() {
5348            let db = GrafeoDB::new_in_memory();
5349            let session = db.session();
5350
5351            // "default" is always valid, even without CREATE GRAPH
5352            session.execute("USE GRAPH default").unwrap();
5353            assert_eq!(session.current_graph(), Some("default".to_string()));
5354        }
5355
5356        #[test]
5357        fn test_session_set_graph() {
5358            let db = GrafeoDB::new_in_memory();
5359            let session = db.session();
5360
5361            session.execute("CREATE GRAPH analytics").unwrap();
5362            session.execute("SESSION SET GRAPH analytics").unwrap();
5363            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5364        }
5365
5366        #[test]
5367        fn test_session_set_graph_nonexistent_errors() {
5368            let db = GrafeoDB::new_in_memory();
5369            let session = db.session();
5370
5371            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5372            assert!(result.is_err());
5373        }
5374
5375        #[test]
5376        fn test_session_set_time_zone() {
5377            let db = GrafeoDB::new_in_memory();
5378            let session = db.session();
5379
5380            assert_eq!(session.time_zone(), None);
5381
5382            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5383            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5384
5385            session
5386                .execute("SESSION SET TIME ZONE 'America/New_York'")
5387                .unwrap();
5388            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5389        }
5390
5391        #[test]
5392        fn test_session_set_parameter() {
5393            let db = GrafeoDB::new_in_memory();
5394            let session = db.session();
5395
5396            session
5397                .execute("SESSION SET PARAMETER $timeout = 30")
5398                .unwrap();
5399
5400            // Parameter is stored (value is Null for now, since expression
5401            // evaluation is not yet wired up)
5402            assert!(session.get_parameter("timeout").is_some());
5403        }
5404
5405        #[test]
5406        fn test_session_reset_clears_all_state() {
5407            let db = GrafeoDB::new_in_memory();
5408            let session = db.session();
5409
5410            // Set various session state
5411            session.execute("CREATE GRAPH analytics").unwrap();
5412            session.execute("SESSION SET GRAPH analytics").unwrap();
5413            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5414            session
5415                .execute("SESSION SET PARAMETER $limit = 100")
5416                .unwrap();
5417
5418            // Verify state was set
5419            assert!(session.current_graph().is_some());
5420            assert!(session.time_zone().is_some());
5421            assert!(session.get_parameter("limit").is_some());
5422
5423            // Reset everything
5424            session.execute("SESSION RESET").unwrap();
5425
5426            assert_eq!(session.current_graph(), None);
5427            assert_eq!(session.time_zone(), None);
5428            assert!(session.get_parameter("limit").is_none());
5429        }
5430
5431        #[test]
5432        fn test_session_close_clears_state() {
5433            let db = GrafeoDB::new_in_memory();
5434            let session = db.session();
5435
5436            session.execute("CREATE GRAPH analytics").unwrap();
5437            session.execute("SESSION SET GRAPH analytics").unwrap();
5438            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5439
5440            session.execute("SESSION CLOSE").unwrap();
5441
5442            assert_eq!(session.current_graph(), None);
5443            assert_eq!(session.time_zone(), None);
5444        }
5445
5446        #[test]
5447        fn test_create_graph() {
5448            let db = GrafeoDB::new_in_memory();
5449            let session = db.session();
5450
5451            session.execute("CREATE GRAPH mydb").unwrap();
5452
5453            // Should be able to USE it now
5454            session.execute("USE GRAPH mydb").unwrap();
5455            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5456        }
5457
5458        #[test]
5459        fn test_create_graph_duplicate_errors() {
5460            let db = GrafeoDB::new_in_memory();
5461            let session = db.session();
5462
5463            session.execute("CREATE GRAPH mydb").unwrap();
5464            let result = session.execute("CREATE GRAPH mydb");
5465
5466            assert!(result.is_err());
5467            let err = result.unwrap_err().to_string();
5468            assert!(
5469                err.contains("already exists"),
5470                "Expected 'already exists' error, got: {err}"
5471            );
5472        }
5473
5474        #[test]
5475        fn test_create_graph_if_not_exists() {
5476            let db = GrafeoDB::new_in_memory();
5477            let session = db.session();
5478
5479            session.execute("CREATE GRAPH mydb").unwrap();
5480            // Should succeed silently with IF NOT EXISTS
5481            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5482        }
5483
5484        #[test]
5485        fn test_drop_graph() {
5486            let db = GrafeoDB::new_in_memory();
5487            let session = db.session();
5488
5489            session.execute("CREATE GRAPH mydb").unwrap();
5490            session.execute("DROP GRAPH mydb").unwrap();
5491
5492            // Should no longer be usable
5493            let result = session.execute("USE GRAPH mydb");
5494            assert!(result.is_err());
5495        }
5496
5497        #[test]
5498        fn test_drop_graph_nonexistent_errors() {
5499            let db = GrafeoDB::new_in_memory();
5500            let session = db.session();
5501
5502            let result = session.execute("DROP GRAPH nosuchgraph");
5503            assert!(result.is_err());
5504            let err = result.unwrap_err().to_string();
5505            assert!(
5506                err.contains("does not exist"),
5507                "Expected 'does not exist' error, got: {err}"
5508            );
5509        }
5510
5511        #[test]
5512        fn test_drop_graph_if_exists() {
5513            let db = GrafeoDB::new_in_memory();
5514            let session = db.session();
5515
5516            // Should succeed silently with IF EXISTS
5517            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5518        }
5519
5520        #[test]
5521        fn test_start_transaction_via_gql() {
5522            let db = GrafeoDB::new_in_memory();
5523            let session = db.session();
5524
5525            session.execute("START TRANSACTION").unwrap();
5526            assert!(session.in_transaction());
5527            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5528            session.execute("COMMIT").unwrap();
5529            assert!(!session.in_transaction());
5530
5531            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5532            assert_eq!(result.rows.len(), 1);
5533        }
5534
5535        #[test]
5536        fn test_start_transaction_read_only_blocks_insert() {
5537            let db = GrafeoDB::new_in_memory();
5538            let session = db.session();
5539
5540            session.execute("START TRANSACTION READ ONLY").unwrap();
5541            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5542            assert!(result.is_err());
5543            let err = result.unwrap_err().to_string();
5544            assert!(
5545                err.contains("read-only"),
5546                "Expected read-only error, got: {err}"
5547            );
5548            session.execute("ROLLBACK").unwrap();
5549        }
5550
5551        #[test]
5552        fn test_start_transaction_read_only_allows_reads() {
5553            let db = GrafeoDB::new_in_memory();
5554            let mut session = db.session();
5555            session.begin_transaction().unwrap();
5556            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5557            session.commit().unwrap();
5558
5559            session.execute("START TRANSACTION READ ONLY").unwrap();
5560            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5561            assert_eq!(result.rows.len(), 1);
5562            session.execute("COMMIT").unwrap();
5563        }
5564
5565        #[test]
5566        fn test_rollback_via_gql() {
5567            let db = GrafeoDB::new_in_memory();
5568            let session = db.session();
5569
5570            session.execute("START TRANSACTION").unwrap();
5571            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5572            session.execute("ROLLBACK").unwrap();
5573
5574            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5575            assert!(result.rows.is_empty());
5576        }
5577
5578        #[test]
5579        fn test_start_transaction_with_isolation_level() {
5580            let db = GrafeoDB::new_in_memory();
5581            let session = db.session();
5582
5583            session
5584                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5585                .unwrap();
5586            assert!(session.in_transaction());
5587            session.execute("ROLLBACK").unwrap();
5588        }
5589
5590        #[test]
5591        fn test_session_commands_return_empty_result() {
5592            let db = GrafeoDB::new_in_memory();
5593            let session = db.session();
5594
5595            session.execute("CREATE GRAPH test").unwrap();
5596            let result = session.execute("SESSION SET GRAPH test").unwrap();
5597            assert_eq!(result.row_count(), 0);
5598            assert_eq!(result.column_count(), 0);
5599        }
5600
5601        #[test]
5602        fn test_current_graph_default_is_none() {
5603            let db = GrafeoDB::new_in_memory();
5604            let session = db.session();
5605
5606            assert_eq!(session.current_graph(), None);
5607        }
5608
5609        #[test]
5610        fn test_time_zone_default_is_none() {
5611            let db = GrafeoDB::new_in_memory();
5612            let session = db.session();
5613
5614            assert_eq!(session.time_zone(), None);
5615        }
5616
5617        #[test]
5618        fn test_session_state_independent_across_sessions() {
5619            let db = GrafeoDB::new_in_memory();
5620            let session1 = db.session();
5621            let session2 = db.session();
5622
5623            session1.execute("CREATE GRAPH first").unwrap();
5624            session1.execute("CREATE GRAPH second").unwrap();
5625            session1.execute("SESSION SET GRAPH first").unwrap();
5626            session2.execute("SESSION SET GRAPH second").unwrap();
5627
5628            assert_eq!(session1.current_graph(), Some("first".to_string()));
5629            assert_eq!(session2.current_graph(), Some("second".to_string()));
5630        }
5631
5632        #[test]
5633        fn test_show_node_types() {
5634            let db = GrafeoDB::new_in_memory();
5635            let session = db.session();
5636
5637            session
5638                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5639                .unwrap();
5640
5641            let result = session.execute("SHOW NODE TYPES").unwrap();
5642            assert_eq!(
5643                result.columns,
5644                vec!["name", "properties", "constraints", "parents"]
5645            );
5646            assert_eq!(result.rows.len(), 1);
5647            // First column is the type name
5648            assert_eq!(result.rows[0][0], Value::from("Person"));
5649        }
5650
5651        #[test]
5652        fn test_show_edge_types() {
5653            let db = GrafeoDB::new_in_memory();
5654            let session = db.session();
5655
5656            session
5657                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5658                .unwrap();
5659
5660            let result = session.execute("SHOW EDGE TYPES").unwrap();
5661            assert_eq!(
5662                result.columns,
5663                vec!["name", "properties", "source_types", "target_types"]
5664            );
5665            assert_eq!(result.rows.len(), 1);
5666            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5667        }
5668
5669        #[test]
5670        fn test_show_graph_types() {
5671            let db = GrafeoDB::new_in_memory();
5672            let session = db.session();
5673
5674            session
5675                .execute("CREATE NODE TYPE Person (name STRING)")
5676                .unwrap();
5677            session
5678                .execute(
5679                    "CREATE GRAPH TYPE social (\
5680                        NODE TYPE Person (name STRING)\
5681                    )",
5682                )
5683                .unwrap();
5684
5685            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5686            assert_eq!(
5687                result.columns,
5688                vec!["name", "open", "node_types", "edge_types"]
5689            );
5690            assert_eq!(result.rows.len(), 1);
5691            assert_eq!(result.rows[0][0], Value::from("social"));
5692        }
5693
5694        #[test]
5695        fn test_show_graph_type_named() {
5696            let db = GrafeoDB::new_in_memory();
5697            let session = db.session();
5698
5699            session
5700                .execute("CREATE NODE TYPE Person (name STRING)")
5701                .unwrap();
5702            session
5703                .execute(
5704                    "CREATE GRAPH TYPE social (\
5705                        NODE TYPE Person (name STRING)\
5706                    )",
5707                )
5708                .unwrap();
5709
5710            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5711            assert_eq!(result.rows.len(), 1);
5712            assert_eq!(result.rows[0][0], Value::from("social"));
5713        }
5714
5715        #[test]
5716        fn test_show_graph_type_not_found() {
5717            let db = GrafeoDB::new_in_memory();
5718            let session = db.session();
5719
5720            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5721            assert!(result.is_err());
5722        }
5723
5724        #[test]
5725        fn test_show_indexes_via_gql() {
5726            let db = GrafeoDB::new_in_memory();
5727            let session = db.session();
5728
5729            let result = session.execute("SHOW INDEXES").unwrap();
5730            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5731        }
5732
5733        #[test]
5734        fn test_show_constraints_via_gql() {
5735            let db = GrafeoDB::new_in_memory();
5736            let session = db.session();
5737
5738            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5739            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5740        }
5741
5742        #[test]
5743        fn test_pattern_form_graph_type_roundtrip() {
5744            let db = GrafeoDB::new_in_memory();
5745            let session = db.session();
5746
5747            // Register the types first
5748            session
5749                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5750                .unwrap();
5751            session
5752                .execute("CREATE NODE TYPE City (name STRING)")
5753                .unwrap();
5754            session
5755                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5756                .unwrap();
5757            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5758
5759            // Create graph type using pattern form
5760            session
5761                .execute(
5762                    "CREATE GRAPH TYPE social (\
5763                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5764                        (:Person)-[:LIVES_IN]->(:City)\
5765                    )",
5766                )
5767                .unwrap();
5768
5769            // Verify it was created
5770            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5771            assert_eq!(result.rows.len(), 1);
5772            assert_eq!(result.rows[0][0], Value::from("social"));
5773        }
5774    }
5775}