Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "triple-store")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12#[cfg(feature = "lpg")]
13use std::sync::atomic::Ordering;
14use std::time::{Duration, Instant};
15
16#[cfg(feature = "lpg")]
17use grafeo_common::grafeo_debug_span;
18#[cfg(feature = "lpg")]
19use grafeo_common::types::{EdgeId, NodeId};
20use grafeo_common::types::{EpochId, TransactionId, Value};
21use grafeo_common::utils::error::Result;
22use grafeo_common::{grafeo_info_span, grafeo_warn};
23#[cfg(feature = "lpg")]
24use grafeo_core::graph::Direction;
25#[cfg(feature = "lpg")]
26use grafeo_core::graph::lpg::LpgStore;
27#[cfg(feature = "lpg")]
28use grafeo_core::graph::lpg::{Edge, Node};
29#[cfg(feature = "triple-store")]
30use grafeo_core::graph::rdf::RdfStore;
31use grafeo_core::graph::{GraphStore, GraphStoreMut};
32
33use crate::catalog::{Catalog, CatalogConstraintValidator};
34use crate::config::{AdaptiveConfig, GraphModel};
35use crate::database::QueryResult;
36use crate::query::Executor;
37use crate::query::cache::QueryCache;
38use crate::transaction::TransactionManager;
39
40/// Storage key suffix for the implicit default graph within a schema.
41/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
42const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
43
44/// Parses a DDL default-value literal string into a [`Value`].
45///
46/// Handles string literals (single- or double-quoted), integers, floats,
47/// booleans (`true`/`false`), and `NULL`.
48fn parse_default_literal(text: &str) -> Value {
49    if text.eq_ignore_ascii_case("null") {
50        return Value::Null;
51    }
52    if text.eq_ignore_ascii_case("true") {
53        return Value::Bool(true);
54    }
55    if text.eq_ignore_ascii_case("false") {
56        return Value::Bool(false);
57    }
58    // String literal: strip surrounding quotes
59    if (text.starts_with('\'') && text.ends_with('\''))
60        || (text.starts_with('"') && text.ends_with('"'))
61    {
62        return Value::String(text[1..text.len() - 1].into());
63    }
64    // Try integer, then float
65    if let Ok(i) = text.parse::<i64>() {
66        return Value::Int64(i);
67    }
68    if let Ok(f) = text.parse::<f64>() {
69        return Value::Float64(f);
70    }
71    // Fallback: treat as string
72    Value::String(text.into())
73}
74
75/// Runtime configuration for creating a new session.
76///
77/// Groups the shared parameters passed to all session constructors, keeping
78/// call sites readable and avoiding long argument lists.
79pub(crate) struct SessionConfig {
80    pub transaction_manager: Arc<TransactionManager>,
81    pub query_cache: Arc<QueryCache>,
82    pub catalog: Arc<Catalog>,
83    pub adaptive_config: AdaptiveConfig,
84    pub factorized_execution: bool,
85    pub graph_model: GraphModel,
86    pub query_timeout: Option<Duration>,
87    pub max_property_size: Option<usize>,
88    /// Buffer manager for memory-aware query execution.
89    pub buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
90    pub commit_counter: Arc<AtomicUsize>,
91    pub gc_interval: usize,
92    /// When true, the session permanently blocks all mutations.
93    pub read_only: bool,
94    /// The identity bound to this session (for permission checks).
95    pub identity: crate::auth::Identity,
96    /// Named graph projections shared with the database.
97    #[cfg(feature = "lpg")]
98    pub projections: Arc<
99        parking_lot::RwLock<
100            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
101        >,
102    >,
103}
104
105/// Your handle to the database - execute queries and manage transactions.
106///
107/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
108/// tracks its own transaction state, so you can have multiple concurrent
109/// sessions without them interfering.
110pub struct Session {
111    /// The underlying store.
112    #[cfg(feature = "lpg")]
113    store: Arc<LpgStore>,
114    /// Graph store trait object for pluggable storage backends (read path).
115    graph_store: Arc<dyn GraphStore>,
116    /// Writable graph store (None for read-only databases).
117    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
118    /// Schema and metadata catalog shared across sessions.
119    catalog: Arc<Catalog>,
120    /// RDF triple store (if RDF feature is enabled).
121    #[cfg(feature = "triple-store")]
122    rdf_store: Arc<RdfStore>,
123    /// Transaction manager.
124    transaction_manager: Arc<TransactionManager>,
125    /// Query cache shared across sessions.
126    query_cache: Arc<QueryCache>,
127    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
128    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
129    /// from within `execute(&self)`.
130    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
131    /// Whether the current transaction is read-only (blocks mutations).
132    read_only_tx: parking_lot::Mutex<bool>,
133    /// Whether the database itself is read-only (set at open time, never changes).
134    /// When true, `read_only_tx` is always true regardless of transaction flags.
135    db_read_only: bool,
136    /// The identity bound to this session (determines permission level).
137    identity: crate::auth::Identity,
138    /// Whether the session is in auto-commit mode.
139    auto_commit: bool,
140    /// Adaptive execution configuration.
141    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
142    adaptive_config: AdaptiveConfig,
143    /// Whether to use factorized execution for multi-hop queries.
144    factorized_execution: bool,
145    /// The graph data model this session operates on.
146    graph_model: GraphModel,
147    /// Maximum time a query may run before being cancelled.
148    query_timeout: Option<Duration>,
149    /// Maximum size in bytes for a single property value.
150    max_property_size: Option<usize>,
151    /// Buffer manager for memory-aware execution (spill decisions).
152    buffer_manager: Option<Arc<grafeo_common::memory::buffer::BufferManager>>,
153    /// Shared commit counter for triggering auto-GC.
154    commit_counter: Arc<AtomicUsize>,
155    /// GC every N commits (0 = disabled).
156    gc_interval: usize,
157    /// Node count at the start of the current transaction (for PreparedCommit stats).
158    transaction_start_node_count: AtomicUsize,
159    /// Edge count at the start of the current transaction (for PreparedCommit stats).
160    transaction_start_edge_count: AtomicUsize,
161    /// WAL for logging schema changes.
162    #[cfg(feature = "wal")]
163    wal: Option<Arc<grafeo_storage::wal::LpgWal>>,
164    /// Shared WAL graph context tracker for named graph awareness.
165    #[cfg(feature = "wal")]
166    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
167    /// CDC log for change tracking.
168    #[cfg(feature = "cdc")]
169    cdc_log: Arc<crate::cdc::CdcLog>,
170    /// Buffered CDC events for the current transaction.
171    /// Flushed to `cdc_log` on commit, discarded on rollback.
172    #[cfg(feature = "cdc")]
173    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
174    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
175    current_graph: parking_lot::Mutex<Option<String>>,
176    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
177    /// None = "not set" (uses default schema).
178    current_schema: parking_lot::Mutex<Option<String>>,
179    /// Session time zone override.
180    time_zone: parking_lot::Mutex<Option<String>>,
181    /// Session-level parameters (SET PARAMETER).
182    session_params:
183        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
184    /// Override epoch for time-travel queries (None = use transaction/current epoch).
185    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
186    /// Savepoints within the current transaction.
187    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
188    /// Nesting depth for nested transactions (0 = outermost).
189    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
190    /// releases it, nested `ROLLBACK` rolls back to it.
191    transaction_nesting_depth: parking_lot::Mutex<u32>,
192    /// Named graphs touched during the current transaction (for cross-graph atomicity).
193    /// `None` represents the default graph. Populated at `BEGIN` time and on each
194    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
195    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
196    /// Shared metrics registry (populated when the `metrics` feature is enabled).
197    #[cfg(feature = "metrics")]
198    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
199    /// Transaction start time for duration tracking.
200    #[cfg(feature = "metrics")]
201    tx_start_time: parking_lot::Mutex<Option<Instant>>,
202    /// Named graph projections shared with the database.
203    #[cfg(feature = "lpg")]
204    projections: Arc<
205        parking_lot::RwLock<
206            std::collections::HashMap<String, Arc<grafeo_core::graph::GraphProjection>>,
207        >,
208    >,
209}
210
211/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
212#[derive(Clone)]
213struct GraphSavepoint {
214    graph_name: Option<String>,
215    next_node_id: u64,
216    next_edge_id: u64,
217    undo_log_position: usize,
218}
219
220/// Savepoint state: name + per-graph snapshots + the graph that was active.
221#[derive(Clone)]
222struct SavepointState {
223    name: String,
224    graph_snapshots: Vec<GraphSavepoint>,
225    /// The graph that was active when the savepoint was created.
226    /// Reserved for future use (e.g., restoring graph context on rollback).
227    #[allow(dead_code)]
228    active_graph: Option<String>,
229    /// CDC event buffer position at savepoint creation.
230    /// On rollback-to-savepoint, the buffer is truncated to this position.
231    #[cfg(feature = "cdc")]
232    cdc_event_position: usize,
233}
234
235impl Session {
236    /// Creates a new session with adaptive execution configuration.
237    #[cfg(feature = "lpg")]
238    #[allow(dead_code)] // Used when lpg enabled without triple-store
239    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
240        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
241        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
242        Self {
243            store,
244            graph_store,
245            graph_store_mut,
246            catalog: cfg.catalog,
247            #[cfg(feature = "triple-store")]
248            rdf_store: Arc::new(RdfStore::new()),
249            transaction_manager: cfg.transaction_manager,
250            query_cache: cfg.query_cache,
251            current_transaction: parking_lot::Mutex::new(None),
252            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
253            db_read_only: cfg.read_only,
254            identity: cfg.identity,
255            auto_commit: true,
256            adaptive_config: cfg.adaptive_config,
257            factorized_execution: cfg.factorized_execution,
258            graph_model: cfg.graph_model,
259            query_timeout: cfg.query_timeout,
260            max_property_size: cfg.max_property_size,
261            buffer_manager: cfg.buffer_manager,
262            commit_counter: cfg.commit_counter,
263            gc_interval: cfg.gc_interval,
264            transaction_start_node_count: AtomicUsize::new(0),
265            transaction_start_edge_count: AtomicUsize::new(0),
266            #[cfg(feature = "wal")]
267            wal: None,
268            #[cfg(feature = "wal")]
269            wal_graph_context: None,
270            #[cfg(feature = "cdc")]
271            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
272            #[cfg(feature = "cdc")]
273            cdc_pending_events: None,
274            current_graph: parking_lot::Mutex::new(None),
275            current_schema: parking_lot::Mutex::new(None),
276            time_zone: parking_lot::Mutex::new(None),
277            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
278            viewing_epoch_override: parking_lot::Mutex::new(None),
279            savepoints: parking_lot::Mutex::new(Vec::new()),
280            transaction_nesting_depth: parking_lot::Mutex::new(0),
281            touched_graphs: parking_lot::Mutex::new(Vec::new()),
282            #[cfg(feature = "metrics")]
283            metrics: None,
284            #[cfg(feature = "metrics")]
285            tx_start_time: parking_lot::Mutex::new(None),
286            projections: cfg.projections,
287        }
288    }
289
290    /// Overrides the graph store and write store used by the query engine.
291    ///
292    /// Used by the layered store integration: the session's `store` field is
293    /// the overlay `LpgStore` (for MVCC), but reads and writes should route
294    /// through the `LayeredStore` (which merges base + overlay).
295    pub(crate) fn override_stores(
296        &mut self,
297        read_store: Arc<dyn GraphStore>,
298        write_store: Option<Arc<dyn GraphStoreMut>>,
299    ) {
300        self.graph_store = read_store;
301        self.graph_store_mut = write_store;
302    }
303
304    /// Sets the WAL for this session (shared with the database).
305    ///
306    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
307    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
308    #[cfg(all(feature = "wal", feature = "lpg"))]
309    pub(crate) fn set_wal(
310        &mut self,
311        wal: Arc<grafeo_storage::wal::LpgWal>,
312        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
313    ) {
314        // Wrap the graph store so query-engine mutations are WAL-logged
315        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
316            Arc::clone(&self.store),
317            Arc::clone(&wal),
318            Arc::clone(&wal_graph_context),
319        ));
320        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
321        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
322        self.wal = Some(wal);
323        self.wal_graph_context = Some(wal_graph_context);
324    }
325
326    /// Sets the CDC log for this session (shared with the database).
327    ///
328    /// Wraps the current write store with a `CdcGraphStore` decorator so
329    /// that all session mutations (INSERT, SET, DELETE via query execution)
330    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
331    /// and discarded on rollback.
332    #[cfg(feature = "cdc")]
333    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
334        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
335        // The read store (self.graph_store) is left unchanged for zero read overhead.
336        if let Some(ref write_store) = self.graph_store_mut {
337            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
338                Arc::clone(write_store),
339                Arc::clone(&cdc_log),
340            ));
341            self.cdc_pending_events = Some(cdc_store.pending_events());
342            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
343        }
344        self.cdc_log = cdc_log;
345    }
346
347    /// Sets the metrics registry for this session (shared with the database).
348    #[cfg(feature = "metrics")]
349    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
350        self.metrics = Some(metrics);
351    }
352
353    /// Creates a session backed by an external graph store.
354    ///
355    /// The external store handles all data operations. Transaction management
356    /// (begin/commit/rollback) is not supported for external stores.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the internal arena allocation fails (out of memory).
361    pub(crate) fn with_external_store(
362        read_store: Arc<dyn GraphStore>,
363        write_store: Option<Arc<dyn GraphStoreMut>>,
364        cfg: SessionConfig,
365    ) -> Result<Self> {
366        Ok(Self {
367            #[cfg(feature = "lpg")]
368            store: Arc::new(LpgStore::new()?),
369            graph_store: read_store,
370            graph_store_mut: write_store,
371            catalog: cfg.catalog,
372            #[cfg(feature = "triple-store")]
373            rdf_store: Arc::new(RdfStore::new()),
374            transaction_manager: cfg.transaction_manager,
375            query_cache: cfg.query_cache,
376            current_transaction: parking_lot::Mutex::new(None),
377            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
378            db_read_only: cfg.read_only,
379            identity: cfg.identity,
380            auto_commit: true,
381            adaptive_config: cfg.adaptive_config,
382            factorized_execution: cfg.factorized_execution,
383            graph_model: cfg.graph_model,
384            query_timeout: cfg.query_timeout,
385            max_property_size: cfg.max_property_size,
386            buffer_manager: cfg.buffer_manager,
387            commit_counter: cfg.commit_counter,
388            gc_interval: cfg.gc_interval,
389            transaction_start_node_count: AtomicUsize::new(0),
390            transaction_start_edge_count: AtomicUsize::new(0),
391            #[cfg(feature = "wal")]
392            wal: None,
393            #[cfg(feature = "wal")]
394            wal_graph_context: None,
395            #[cfg(feature = "cdc")]
396            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
397            #[cfg(feature = "cdc")]
398            cdc_pending_events: None,
399            current_graph: parking_lot::Mutex::new(None),
400            current_schema: parking_lot::Mutex::new(None),
401            time_zone: parking_lot::Mutex::new(None),
402            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
403            viewing_epoch_override: parking_lot::Mutex::new(None),
404            savepoints: parking_lot::Mutex::new(Vec::new()),
405            transaction_nesting_depth: parking_lot::Mutex::new(0),
406            touched_graphs: parking_lot::Mutex::new(Vec::new()),
407            #[cfg(feature = "metrics")]
408            metrics: None,
409            #[cfg(feature = "metrics")]
410            tx_start_time: parking_lot::Mutex::new(None),
411            #[cfg(feature = "lpg")]
412            projections: cfg.projections,
413        })
414    }
415
416    /// Returns the graph model this session operates on.
417    #[must_use]
418    pub fn graph_model(&self) -> GraphModel {
419        self.graph_model
420    }
421
422    /// Returns the identity bound to this session.
423    #[must_use]
424    pub fn identity(&self) -> &crate::auth::Identity {
425        &self.identity
426    }
427
428    // === Session State Management ===
429
430    /// Sets the current graph for this session (USE GRAPH).
431    pub fn use_graph(&self, name: &str) {
432        *self.current_graph.lock() = Some(name.to_string());
433    }
434
435    /// Returns the current graph name, if any.
436    #[must_use]
437    pub fn current_graph(&self) -> Option<String> {
438        self.current_graph.lock().clone()
439    }
440
441    /// Sets the current schema for this session (SESSION SET SCHEMA).
442    ///
443    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
444    pub fn set_schema(&self, name: &str) {
445        *self.current_schema.lock() = Some(name.to_string());
446    }
447
448    /// Returns the current schema name, if any.
449    ///
450    /// `None` means "not set", which resolves to the default schema.
451    #[must_use]
452    pub fn current_schema(&self) -> Option<String> {
453        self.current_schema.lock().clone()
454    }
455
456    /// Computes the effective storage key for a graph, accounting for schema context.
457    ///
458    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
459    /// Uses `/` as separator since it is invalid in GQL identifiers.
460    fn effective_graph_key(&self, graph_name: &str) -> String {
461        let schema = self.current_schema.lock().clone();
462        match schema {
463            Some(s) => format!("{s}/{graph_name}"),
464            None => graph_name.to_string(),
465        }
466    }
467
468    /// Computes the effective storage key for a type, accounting for schema context.
469    ///
470    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
471    fn effective_type_key(&self, type_name: &str) -> String {
472        let schema = self.current_schema.lock().clone();
473        match schema {
474            Some(s) => format!("{s}/{type_name}"),
475            None => type_name.to_string(),
476        }
477    }
478
479    /// Returns the effective storage key for the current graph, accounting for schema.
480    ///
481    /// Combines `current_schema` and `current_graph` into a flat lookup key.
482    fn active_graph_storage_key(&self) -> Option<String> {
483        let graph = self.current_graph.lock().clone();
484        let schema = self.current_schema.lock().clone();
485        match (&schema, &graph) {
486            (None, None) => None,
487            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
488            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
489            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
490                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
491            }
492            (None, Some(name)) => Some(name.clone()),
493            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
494        }
495    }
496
497    /// Returns the graph store for the currently active graph.
498    ///
499    /// If `current_graph` is `None` or `"default"`, returns the session's
500    /// default `graph_store` (already WAL-wrapped for the default graph).
501    /// Otherwise looks up the named graph in the root store and wraps it
502    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
503    /// graph context.
504    fn active_store(&self) -> Arc<dyn GraphStore> {
505        let key = self.active_graph_storage_key();
506        match key {
507            None => Arc::clone(&self.graph_store),
508            #[cfg(feature = "lpg")]
509            Some(ref name) => match self.store.graph(name) {
510                Some(named_store) => {
511                    #[cfg(feature = "wal")]
512                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
513                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
514                            named_store,
515                            Arc::clone(wal),
516                            name.clone(),
517                            Arc::clone(ctx),
518                        )) as Arc<dyn GraphStore>;
519                    }
520                    named_store as Arc<dyn GraphStore>
521                }
522                None => Arc::clone(&self.graph_store),
523            },
524            #[cfg(not(feature = "lpg"))]
525            Some(_) => Arc::clone(&self.graph_store),
526        }
527    }
528
529    /// Returns the writable store for the active graph, if available.
530    ///
531    /// Returns `None` for read-only databases. For named graphs, wraps
532    /// the store with WAL logging when durability is enabled.
533    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
534        let key = self.active_graph_storage_key();
535        match key {
536            None => self.graph_store_mut.as_ref().map(Arc::clone),
537            #[cfg(feature = "lpg")]
538            Some(ref name) => match self.store.graph(name) {
539                Some(named_store) => {
540                    let mut store: Arc<dyn GraphStoreMut> = named_store;
541
542                    #[cfg(feature = "wal")]
543                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
544                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
545                            // WAL needs Arc<LpgStore>, get it fresh
546                            self.store
547                                .graph(name)
548                                .unwrap_or_else(|| Arc::clone(&self.store)),
549                            Arc::clone(wal),
550                            name.clone(),
551                            Arc::clone(ctx),
552                        ));
553                    }
554
555                    #[cfg(feature = "cdc")]
556                    if let Some(ref pending) = self.cdc_pending_events {
557                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
558                            store,
559                            Arc::clone(&self.cdc_log),
560                            Arc::clone(pending),
561                        ));
562                    }
563
564                    Some(store)
565                }
566                None => self.graph_store_mut.as_ref().map(Arc::clone),
567            },
568            #[cfg(not(feature = "lpg"))]
569            Some(_) => self.graph_store_mut.as_ref().map(Arc::clone),
570        }
571    }
572
573    /// Returns the concrete `LpgStore` for the currently active graph.
574    ///
575    /// Used by direct CRUD methods that need the concrete store type
576    /// for versioned operations.
577    #[cfg(feature = "lpg")]
578    fn active_lpg_store(&self) -> Arc<LpgStore> {
579        let key = self.active_graph_storage_key();
580        match key {
581            None => Arc::clone(&self.store),
582            Some(ref name) => self
583                .store
584                .graph(name)
585                .unwrap_or_else(|| Arc::clone(&self.store)),
586        }
587    }
588
589    /// Resolves a graph name to a concrete `LpgStore`.
590    /// `None` and `"default"` resolve to the session's root store.
591    #[cfg(feature = "lpg")]
592    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
593        match graph_name {
594            None => Arc::clone(&self.store),
595            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
596            Some(name) => self
597                .store
598                .graph(name)
599                .unwrap_or_else(|| Arc::clone(&self.store)),
600        }
601    }
602
603    /// Records the current graph as "touched" if a transaction is active.
604    ///
605    /// Uses the full storage key (schema/graph) so that commit/rollback
606    /// can resolve the correct store via `resolve_store`.
607    fn track_graph_touch(&self) {
608        if self.current_transaction.lock().is_some() {
609            let key = self.active_graph_storage_key();
610            let mut touched = self.touched_graphs.lock();
611            if !touched.contains(&key) {
612                touched.push(key);
613            }
614        }
615    }
616
617    /// Sets the session time zone.
618    pub fn set_time_zone(&self, tz: &str) {
619        *self.time_zone.lock() = Some(tz.to_string());
620    }
621
622    /// Returns the session time zone, if set.
623    #[must_use]
624    pub fn time_zone(&self) -> Option<String> {
625        self.time_zone.lock().clone()
626    }
627
628    /// Sets a session parameter.
629    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
630        self.session_params.lock().insert(key.to_string(), value);
631    }
632
633    /// Gets a session parameter by cloning it.
634    #[must_use]
635    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
636        self.session_params.lock().get(key).cloned()
637    }
638
639    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
640    pub fn reset_session(&self) {
641        *self.current_schema.lock() = None;
642        *self.current_graph.lock() = None;
643        *self.time_zone.lock() = None;
644        self.session_params.lock().clear();
645        *self.viewing_epoch_override.lock() = None;
646    }
647
648    /// Resets only the session schema (Section 7.2 GR1).
649    pub fn reset_schema(&self) {
650        *self.current_schema.lock() = None;
651    }
652
653    /// Resets only the session graph (Section 7.2 GR2).
654    pub fn reset_graph(&self) {
655        *self.current_graph.lock() = None;
656    }
657
658    /// Resets only the session time zone (Section 7.2 GR3).
659    pub fn reset_time_zone(&self) {
660        *self.time_zone.lock() = None;
661    }
662
663    /// Resets only session parameters (Section 7.2 GR4).
664    pub fn reset_parameters(&self) {
665        self.session_params.lock().clear();
666    }
667
668    // --- Time-travel API ---
669
670    /// Sets a viewing epoch override for time-travel queries.
671    ///
672    /// While set, all queries on this session see the database as it existed
673    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
674    /// to return to normal behavior.
675    pub fn set_viewing_epoch(&self, epoch: EpochId) {
676        *self.viewing_epoch_override.lock() = Some(epoch);
677    }
678
679    /// Clears the viewing epoch override, returning to normal behavior.
680    pub fn clear_viewing_epoch(&self) {
681        *self.viewing_epoch_override.lock() = None;
682    }
683
684    /// Returns the current viewing epoch override, if any.
685    #[must_use]
686    pub fn viewing_epoch(&self) -> Option<EpochId> {
687        *self.viewing_epoch_override.lock()
688    }
689
690    /// Returns all versions of a node with their creation/deletion epochs.
691    ///
692    /// Properties and labels reflect the current state (not versioned per-epoch).
693    #[cfg(feature = "lpg")]
694    #[must_use]
695    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
696        self.active_lpg_store().get_node_history(id)
697    }
698
699    /// Returns all versions of an edge with their creation/deletion epochs.
700    ///
701    /// Properties reflect the current state (not versioned per-epoch).
702    #[cfg(feature = "lpg")]
703    #[must_use]
704    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
705        self.active_lpg_store().get_edge_history(id)
706    }
707
708    /// Checks that the session's graph model supports LPG operations.
709    fn require_lpg(&self, language: &str) -> Result<()> {
710        if self.graph_model == GraphModel::Rdf {
711            return Err(grafeo_common::utils::error::Error::Internal(format!(
712                "This is an RDF database. {language} queries require an LPG database."
713            )));
714        }
715        Ok(())
716    }
717
718    /// Checks that the session's identity is permitted to execute the given
719    /// statement kind. Returns an error if the role is insufficient.
720    ///
721    /// Short-circuits for Admin identities (the common case for embedded use
722    /// and benchmarks) to avoid any overhead on the hot path.
723    #[inline]
724    fn require_permission(&self, kind: crate::auth::StatementKind) -> Result<()> {
725        // Fast path: Admin can do everything
726        if self.identity.can_admin() {
727            return Ok(());
728        }
729        crate::auth::check_permission(&self.identity, kind).map_err(|denied| {
730            grafeo_common::utils::error::Error::Query(grafeo_common::utils::error::QueryError::new(
731                grafeo_common::utils::error::QueryErrorKind::Semantic,
732                denied.to_string(),
733            ))
734        })
735    }
736
737    /// Executes a session or transaction command, returning an empty result.
738    #[cfg(feature = "gql")]
739    fn execute_session_command(
740        &self,
741        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
742    ) -> Result<QueryResult> {
743        use grafeo_adapters::query::gql::ast::SessionCommand;
744        #[cfg(feature = "lpg")]
745        use grafeo_adapters::query::gql::ast::TransactionIsolationLevel;
746        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
747
748        // Check role-based permission for graph management commands
749        match &cmd {
750            SessionCommand::CreateGraph { .. }
751            | SessionCommand::DropGraph { .. }
752            | SessionCommand::CreateProjection { .. }
753            | SessionCommand::DropProjection { .. } => {
754                self.require_permission(crate::auth::StatementKind::Write)?;
755            }
756            _ => {} // Session state + transaction control: always allowed
757        }
758
759        // Check per-graph grants for graph-scoped commands
760        if self.identity.has_grants() {
761            match &cmd {
762                SessionCommand::CreateGraph { name, .. }
763                | SessionCommand::DropGraph { name, .. } => {
764                    if !self
765                        .identity
766                        .can_access_graph(name, crate::auth::Role::ReadWrite)
767                    {
768                        return Err(Error::Query(QueryError::new(
769                            QueryErrorKind::Semantic,
770                            format!(
771                                "permission denied: no grant for graph '{name}' (user: {})",
772                                self.identity.user_id()
773                            ),
774                        )));
775                    }
776                }
777                _ => {}
778            }
779        }
780
781        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
782        if *self.read_only_tx.lock() {
783            match &cmd {
784                SessionCommand::CreateGraph { .. }
785                | SessionCommand::DropGraph { .. }
786                | SessionCommand::CreateProjection { .. }
787                | SessionCommand::DropProjection { .. } => {
788                    return Err(Error::Transaction(
789                        grafeo_common::utils::error::TransactionError::ReadOnly,
790                    ));
791                }
792                _ => {} // Session state + transaction control allowed
793            }
794        }
795
796        match cmd {
797            #[cfg(feature = "lpg")]
798            SessionCommand::CreateGraph {
799                name,
800                if_not_exists,
801                typed,
802                like_graph,
803                copy_of,
804                open: _,
805            } => {
806                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
807                let storage_key = self.effective_graph_key(&name);
808
809                // Validate source graph exists for LIKE / AS COPY OF
810                if let Some(ref src) = like_graph {
811                    let src_key = self.effective_graph_key(src);
812                    if self.store.graph(&src_key).is_none() {
813                        return Err(Error::Query(QueryError::new(
814                            QueryErrorKind::Semantic,
815                            format!("Source graph '{src}' does not exist"),
816                        )));
817                    }
818                }
819                if let Some(ref src) = copy_of {
820                    let src_key = self.effective_graph_key(src);
821                    if self.store.graph(&src_key).is_none() {
822                        return Err(Error::Query(QueryError::new(
823                            QueryErrorKind::Semantic,
824                            format!("Source graph '{src}' does not exist"),
825                        )));
826                    }
827                }
828
829                let created = self
830                    .store
831                    .create_graph(&storage_key)
832                    .map_err(|e| Error::Internal(e.to_string()))?;
833                if !created && !if_not_exists {
834                    return Err(Error::Query(QueryError::new(
835                        QueryErrorKind::Semantic,
836                        format!("Graph '{name}' already exists"),
837                    )));
838                }
839                if created {
840                    #[cfg(feature = "wal")]
841                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::CreateNamedGraph {
842                        name: storage_key.clone(),
843                    });
844                }
845
846                // AS COPY OF: copy data from source graph
847                if let Some(ref src) = copy_of {
848                    let src_key = self.effective_graph_key(src);
849                    self.store
850                        .copy_graph(Some(&src_key), Some(&storage_key))
851                        .map_err(|e| Error::Internal(e.to_string()))?;
852                }
853
854                // Bind to graph type if specified.
855                // If the parser produced a '/' in the name it is already a qualified
856                // "schema/type" key; otherwise resolve against the current schema.
857                if let Some(type_name) = typed
858                    && let Err(e) = self.catalog.bind_graph_type(
859                        &storage_key,
860                        if type_name.contains('/') {
861                            type_name.clone()
862                        } else {
863                            self.effective_type_key(&type_name)
864                        },
865                    )
866                {
867                    return Err(Error::Query(QueryError::new(
868                        QueryErrorKind::Semantic,
869                        e.to_string(),
870                    )));
871                }
872
873                // LIKE: copy graph type binding from source
874                if let Some(ref src) = like_graph {
875                    let src_key = self.effective_graph_key(src);
876                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
877                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
878                    }
879                }
880
881                Ok(QueryResult::empty())
882            }
883            #[cfg(feature = "lpg")]
884            SessionCommand::DropGraph { name, if_exists } => {
885                let storage_key = self.effective_graph_key(&name);
886                let dropped = self.store.drop_graph(&storage_key);
887                if !dropped && !if_exists {
888                    return Err(Error::Query(QueryError::new(
889                        QueryErrorKind::Semantic,
890                        format!("Graph '{name}' does not exist"),
891                    )));
892                }
893                if dropped {
894                    #[cfg(feature = "wal")]
895                    self.log_schema_wal(&grafeo_storage::wal::WalRecord::DropNamedGraph {
896                        name: storage_key.clone(),
897                    });
898                    // If this session was using the dropped graph, reset to default
899                    let mut current = self.current_graph.lock();
900                    if current
901                        .as_deref()
902                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
903                    {
904                        *current = None;
905                    }
906                }
907                Ok(QueryResult::empty())
908            }
909            #[cfg(feature = "lpg")]
910            SessionCommand::UseGraph(name) => {
911                // Check per-graph grant before switching
912                if self.identity.has_grants()
913                    && !name.eq_ignore_ascii_case("default")
914                    && !self
915                        .identity
916                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
917                {
918                    return Err(Error::Query(QueryError::new(
919                        QueryErrorKind::Semantic,
920                        format!(
921                            "permission denied: no grant for graph '{name}' (user: {})",
922                            self.identity.user_id()
923                        ),
924                    )));
925                }
926                // Verify graph exists (resolve within current schema)
927                let effective_key = self.effective_graph_key(&name);
928                if !name.eq_ignore_ascii_case("default")
929                    && self.store.graph(&effective_key).is_none()
930                {
931                    return Err(Error::Query(QueryError::new(
932                        QueryErrorKind::Semantic,
933                        format!("Graph '{name}' does not exist"),
934                    )));
935                }
936                self.use_graph(&name);
937                // Track the new graph if in a transaction
938                self.track_graph_touch();
939                Ok(QueryResult::empty())
940            }
941            #[cfg(feature = "lpg")]
942            SessionCommand::SessionSetGraph(name) => {
943                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
944                // Check per-graph grant before switching (same as USE GRAPH)
945                if self.identity.has_grants()
946                    && !name.eq_ignore_ascii_case("default")
947                    && !self
948                        .identity
949                        .can_access_graph(&name, crate::auth::Role::ReadOnly)
950                {
951                    return Err(Error::Query(QueryError::new(
952                        QueryErrorKind::Semantic,
953                        format!(
954                            "permission denied: no grant for graph '{name}' (user: {})",
955                            self.identity.user_id()
956                        ),
957                    )));
958                }
959                let effective_key = self.effective_graph_key(&name);
960                if !name.eq_ignore_ascii_case("default")
961                    && self.store.graph(&effective_key).is_none()
962                {
963                    return Err(Error::Query(QueryError::new(
964                        QueryErrorKind::Semantic,
965                        format!("Graph '{name}' does not exist"),
966                    )));
967                }
968                self.use_graph(&name);
969                // Track the new graph if in a transaction
970                self.track_graph_touch();
971                Ok(QueryResult::empty())
972            }
973            SessionCommand::SessionSetSchema(name) => {
974                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
975                if !self.catalog.schema_exists(&name) {
976                    return Err(Error::Query(QueryError::new(
977                        QueryErrorKind::Semantic,
978                        format!("Schema '{name}' does not exist"),
979                    )));
980                }
981                self.set_schema(&name);
982                Ok(QueryResult::empty())
983            }
984            SessionCommand::SessionSetTimeZone(tz) => {
985                self.set_time_zone(&tz);
986                Ok(QueryResult::empty())
987            }
988            #[cfg(feature = "gql")]
989            SessionCommand::SessionSetParameter(key, expr) => {
990                if key.eq_ignore_ascii_case("viewing_epoch") {
991                    match Self::eval_integer_literal(&expr) {
992                        Some(n) if n >= 0 => {
993                            // reason: guard ensures n >= 0
994                            #[allow(clippy::cast_sign_loss)]
995                            let epoch = n as u64;
996                            self.set_viewing_epoch(EpochId::new(epoch));
997                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
998                        }
999                        _ => Err(Error::Query(QueryError::new(
1000                            QueryErrorKind::Semantic,
1001                            "viewing_epoch must be a non-negative integer literal",
1002                        ))),
1003                    }
1004                } else {
1005                    // For now, store parameter name with Null value.
1006                    // Full expression evaluation would require building and executing a plan.
1007                    self.set_parameter(&key, Value::Null);
1008                    Ok(QueryResult::empty())
1009                }
1010            }
1011            SessionCommand::SessionReset(target) => {
1012                use grafeo_adapters::query::gql::ast::SessionResetTarget;
1013                match target {
1014                    SessionResetTarget::All => self.reset_session(),
1015                    SessionResetTarget::Schema => self.reset_schema(),
1016                    SessionResetTarget::Graph => self.reset_graph(),
1017                    SessionResetTarget::TimeZone => self.reset_time_zone(),
1018                    SessionResetTarget::Parameters => self.reset_parameters(),
1019                }
1020                Ok(QueryResult::empty())
1021            }
1022            SessionCommand::SessionClose => {
1023                self.reset_session();
1024                Ok(QueryResult::empty())
1025            }
1026            #[cfg(feature = "lpg")]
1027            SessionCommand::StartTransaction {
1028                read_only,
1029                isolation_level,
1030            } => {
1031                let engine_level = isolation_level.map(|l| match l {
1032                    TransactionIsolationLevel::ReadCommitted => {
1033                        crate::transaction::IsolationLevel::ReadCommitted
1034                    }
1035                    TransactionIsolationLevel::SnapshotIsolation => {
1036                        crate::transaction::IsolationLevel::SnapshotIsolation
1037                    }
1038                    TransactionIsolationLevel::Serializable => {
1039                        crate::transaction::IsolationLevel::Serializable
1040                    }
1041                });
1042                self.begin_transaction_inner(read_only, engine_level)?;
1043                Ok(QueryResult::status("Transaction started"))
1044            }
1045            #[cfg(feature = "lpg")]
1046            SessionCommand::Commit => {
1047                self.commit_inner()?;
1048                Ok(QueryResult::status("Transaction committed"))
1049            }
1050            #[cfg(feature = "lpg")]
1051            SessionCommand::Rollback => {
1052                self.rollback_inner()?;
1053                Ok(QueryResult::status("Transaction rolled back"))
1054            }
1055            #[cfg(feature = "lpg")]
1056            SessionCommand::Savepoint(name) => {
1057                self.savepoint(&name)?;
1058                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
1059            }
1060            #[cfg(feature = "lpg")]
1061            SessionCommand::RollbackToSavepoint(name) => {
1062                self.rollback_to_savepoint(&name)?;
1063                Ok(QueryResult::status(format!(
1064                    "Rolled back to savepoint '{name}'"
1065                )))
1066            }
1067            #[cfg(feature = "lpg")]
1068            SessionCommand::ReleaseSavepoint(name) => {
1069                self.release_savepoint(&name)?;
1070                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
1071            }
1072            #[cfg(feature = "lpg")]
1073            SessionCommand::CreateProjection {
1074                name,
1075                node_labels,
1076                edge_types,
1077            } => {
1078                use grafeo_core::graph::{GraphProjection, ProjectionSpec};
1079                use std::collections::hash_map::Entry;
1080
1081                let spec = ProjectionSpec::new()
1082                    .with_node_labels(node_labels)
1083                    .with_edge_types(edge_types);
1084
1085                let store = self.active_store();
1086                let projection = Arc::new(GraphProjection::new(store, spec));
1087                let mut projections = self.projections.write();
1088                match projections.entry(name.clone()) {
1089                    Entry::Occupied(_) => Err(Error::Query(QueryError::new(
1090                        QueryErrorKind::Semantic,
1091                        format!("Projection '{name}' already exists"),
1092                    ))),
1093                    Entry::Vacant(e) => {
1094                        e.insert(projection);
1095                        Ok(QueryResult::status(format!("Projection '{name}' created")))
1096                    }
1097                }
1098            }
1099            #[cfg(feature = "lpg")]
1100            SessionCommand::DropProjection { name } => {
1101                let removed = self.projections.write().remove(&name).is_some();
1102                if !removed {
1103                    return Err(Error::Query(QueryError::new(
1104                        QueryErrorKind::Semantic,
1105                        format!("Projection '{name}' does not exist"),
1106                    )));
1107                }
1108                Ok(QueryResult::status(format!("Projection '{name}' dropped")))
1109            }
1110            #[cfg(feature = "lpg")]
1111            SessionCommand::ShowProjections => {
1112                let mut names: Vec<String> = self.projections.read().keys().cloned().collect();
1113                names.sort();
1114                let rows: Vec<Vec<Value>> =
1115                    names.into_iter().map(|n| vec![Value::from(n)]).collect();
1116                Ok(QueryResult {
1117                    columns: vec!["name".to_string()],
1118                    column_types: Vec::new(),
1119                    rows,
1120                    ..QueryResult::empty()
1121                })
1122            }
1123            #[cfg(not(feature = "lpg"))]
1124            _ => Err(grafeo_common::utils::error::Error::Internal(
1125                "This command requires the `lpg` feature".to_string(),
1126            )),
1127        }
1128    }
1129
1130    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
1131    #[cfg(feature = "wal")]
1132    fn log_schema_wal(&self, record: &grafeo_storage::wal::WalRecord) {
1133        if let Some(ref wal) = self.wal
1134            && let Err(e) = wal.log(record)
1135        {
1136            grafeo_warn!("Failed to log schema change to WAL: {}", e);
1137        }
1138    }
1139
1140    /// Executes a schema DDL command, returning a status result.
1141    #[cfg(all(feature = "lpg", feature = "gql"))]
1142    fn execute_schema_command(
1143        &self,
1144        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
1145    ) -> Result<QueryResult> {
1146        use crate::catalog::{
1147            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
1148        };
1149        use grafeo_adapters::query::gql::ast::SchemaStatement;
1150        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1151        #[cfg(feature = "wal")]
1152        use grafeo_storage::wal::WalRecord;
1153
1154        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
1155        macro_rules! wal_log {
1156            ($self:expr, $record:expr) => {
1157                #[cfg(feature = "wal")]
1158                $self.log_schema_wal(&$record);
1159            };
1160        }
1161
1162        let result = match cmd {
1163            SchemaStatement::CreateNodeType(stmt) => {
1164                let effective_name = self.effective_type_key(&stmt.name);
1165                #[cfg(feature = "wal")]
1166                let props_for_wal: Vec<(String, String, bool)> = stmt
1167                    .properties
1168                    .iter()
1169                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1170                    .collect();
1171                let def = NodeTypeDefinition {
1172                    name: effective_name.clone(),
1173                    properties: stmt
1174                        .properties
1175                        .iter()
1176                        .map(|p| TypedProperty {
1177                            name: p.name.clone(),
1178                            data_type: PropertyDataType::from_type_name(&p.data_type),
1179                            nullable: p.nullable,
1180                            default_value: p
1181                                .default_value
1182                                .as_ref()
1183                                .map(|s| parse_default_literal(s)),
1184                        })
1185                        .collect(),
1186                    constraints: Vec::new(),
1187                    parent_types: stmt.parent_types.clone(),
1188                };
1189                let result = if stmt.or_replace {
1190                    let _ = self.catalog.drop_node_type(&effective_name);
1191                    self.catalog.register_node_type(def)
1192                } else {
1193                    self.catalog.register_node_type(def)
1194                };
1195                match result {
1196                    Ok(()) => {
1197                        wal_log!(
1198                            self,
1199                            WalRecord::CreateNodeType {
1200                                name: effective_name.clone(),
1201                                properties: props_for_wal,
1202                                constraints: Vec::new(),
1203                            }
1204                        );
1205                        Ok(QueryResult::status(format!(
1206                            "Created node type '{}'",
1207                            stmt.name
1208                        )))
1209                    }
1210                    Err(e) if stmt.if_not_exists => {
1211                        let _ = e;
1212                        Ok(QueryResult::status("No change"))
1213                    }
1214                    Err(e) => Err(Error::Query(QueryError::new(
1215                        QueryErrorKind::Semantic,
1216                        e.to_string(),
1217                    ))),
1218                }
1219            }
1220            SchemaStatement::CreateEdgeType(stmt) => {
1221                let effective_name = self.effective_type_key(&stmt.name);
1222                #[cfg(feature = "wal")]
1223                let props_for_wal: Vec<(String, String, bool)> = stmt
1224                    .properties
1225                    .iter()
1226                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1227                    .collect();
1228                let def = EdgeTypeDefinition {
1229                    name: effective_name.clone(),
1230                    properties: stmt
1231                        .properties
1232                        .iter()
1233                        .map(|p| TypedProperty {
1234                            name: p.name.clone(),
1235                            data_type: PropertyDataType::from_type_name(&p.data_type),
1236                            nullable: p.nullable,
1237                            default_value: p
1238                                .default_value
1239                                .as_ref()
1240                                .map(|s| parse_default_literal(s)),
1241                        })
1242                        .collect(),
1243                    constraints: Vec::new(),
1244                    source_node_types: stmt.source_node_types.clone(),
1245                    target_node_types: stmt.target_node_types.clone(),
1246                };
1247                let result = if stmt.or_replace {
1248                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1249                    self.catalog.register_edge_type_def(def)
1250                } else {
1251                    self.catalog.register_edge_type_def(def)
1252                };
1253                match result {
1254                    Ok(()) => {
1255                        wal_log!(
1256                            self,
1257                            WalRecord::CreateEdgeType {
1258                                name: effective_name.clone(),
1259                                properties: props_for_wal,
1260                                constraints: Vec::new(),
1261                            }
1262                        );
1263                        Ok(QueryResult::status(format!(
1264                            "Created edge type '{}'",
1265                            stmt.name
1266                        )))
1267                    }
1268                    Err(e) if stmt.if_not_exists => {
1269                        let _ = e;
1270                        Ok(QueryResult::status("No change"))
1271                    }
1272                    Err(e) => Err(Error::Query(QueryError::new(
1273                        QueryErrorKind::Semantic,
1274                        e.to_string(),
1275                    ))),
1276                }
1277            }
1278            SchemaStatement::CreateVectorIndex(stmt) => {
1279                Self::create_vector_index_on_store(
1280                    &self.active_lpg_store(),
1281                    &stmt.node_label,
1282                    &stmt.property,
1283                    stmt.dimensions,
1284                    stmt.metric.as_deref(),
1285                )?;
1286                wal_log!(
1287                    self,
1288                    WalRecord::CreateIndex {
1289                        name: stmt.name.clone(),
1290                        label: stmt.node_label.clone(),
1291                        property: stmt.property.clone(),
1292                        index_type: "vector".to_string(),
1293                    }
1294                );
1295                Ok(QueryResult::status(format!(
1296                    "Created vector index '{}'",
1297                    stmt.name
1298                )))
1299            }
1300            SchemaStatement::DropNodeType { name, if_exists } => {
1301                let effective_name = self.effective_type_key(&name);
1302                match self.catalog.drop_node_type(&effective_name) {
1303                    Ok(()) => {
1304                        wal_log!(
1305                            self,
1306                            WalRecord::DropNodeType {
1307                                name: effective_name
1308                            }
1309                        );
1310                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1311                    }
1312                    Err(e) if if_exists => {
1313                        let _ = e;
1314                        Ok(QueryResult::status("No change"))
1315                    }
1316                    Err(e) => Err(Error::Query(QueryError::new(
1317                        QueryErrorKind::Semantic,
1318                        e.to_string(),
1319                    ))),
1320                }
1321            }
1322            SchemaStatement::DropEdgeType { name, if_exists } => {
1323                let effective_name = self.effective_type_key(&name);
1324                match self.catalog.drop_edge_type_def(&effective_name) {
1325                    Ok(()) => {
1326                        wal_log!(
1327                            self,
1328                            WalRecord::DropEdgeType {
1329                                name: effective_name
1330                            }
1331                        );
1332                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1333                    }
1334                    Err(e) if if_exists => {
1335                        let _ = e;
1336                        Ok(QueryResult::status("No change"))
1337                    }
1338                    Err(e) => Err(Error::Query(QueryError::new(
1339                        QueryErrorKind::Semantic,
1340                        e.to_string(),
1341                    ))),
1342                }
1343            }
1344            SchemaStatement::CreateIndex(stmt) => {
1345                use crate::catalog::IndexType as CatalogIndexType;
1346                use grafeo_adapters::query::gql::ast::IndexKind;
1347                let active = self.active_lpg_store();
1348                let index_type_str = match stmt.index_kind {
1349                    IndexKind::Property => "property",
1350                    IndexKind::BTree => "btree",
1351                    IndexKind::Text => "text",
1352                    IndexKind::Vector => "vector",
1353                };
1354                match stmt.index_kind {
1355                    IndexKind::Property | IndexKind::BTree => {
1356                        for prop in &stmt.properties {
1357                            active.create_property_index(prop);
1358                        }
1359                    }
1360                    IndexKind::Text => {
1361                        for prop in &stmt.properties {
1362                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1363                        }
1364                    }
1365                    IndexKind::Vector => {
1366                        for prop in &stmt.properties {
1367                            Self::create_vector_index_on_store(
1368                                &active,
1369                                &stmt.label,
1370                                prop,
1371                                stmt.options.dimensions,
1372                                stmt.options.metric.as_deref(),
1373                            )?;
1374                        }
1375                    }
1376                }
1377                // Register each property index in the catalog for SHOW INDEXES
1378                // and DROP INDEX name-based lookup.
1379                let catalog_index_type = match stmt.index_kind {
1380                    IndexKind::Property => CatalogIndexType::Hash,
1381                    IndexKind::BTree => CatalogIndexType::BTree,
1382                    IndexKind::Text => CatalogIndexType::FullText,
1383                    IndexKind::Vector => CatalogIndexType::Hash,
1384                };
1385                let label_id = self.catalog.get_or_create_label(&stmt.label);
1386                for prop in &stmt.properties {
1387                    let prop_id = self.catalog.get_or_create_property_key(prop);
1388                    self.catalog
1389                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1390                }
1391                #[cfg(feature = "wal")]
1392                for prop in &stmt.properties {
1393                    wal_log!(
1394                        self,
1395                        WalRecord::CreateIndex {
1396                            name: stmt.name.clone(),
1397                            label: stmt.label.clone(),
1398                            property: prop.clone(),
1399                            index_type: index_type_str.to_string(),
1400                        }
1401                    );
1402                }
1403                Ok(QueryResult::status(format!(
1404                    "Created {} index '{}'",
1405                    index_type_str, stmt.name
1406                )))
1407            }
1408            SchemaStatement::DropIndex { name, if_exists } => {
1409                // Look up the index by name in the catalog to find the
1410                // underlying property, then drop from both catalog and store.
1411                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1412                    let def = self.catalog.get_index(index_id);
1413                    self.catalog.drop_index(index_id);
1414                    if let Some(def) = def
1415                        && let Some(prop_name) =
1416                            self.catalog.get_property_key_name(def.property_key)
1417                    {
1418                        self.active_lpg_store().drop_property_index(&prop_name);
1419                    }
1420                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1421                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1422                } else if if_exists {
1423                    Ok(QueryResult::status("No change".to_string()))
1424                } else {
1425                    Err(Error::Query(QueryError::new(
1426                        QueryErrorKind::Semantic,
1427                        format!("Index '{name}' does not exist"),
1428                    )))
1429                }
1430            }
1431            SchemaStatement::CreateConstraint(stmt) => {
1432                use crate::catalog::TypeConstraint;
1433                use grafeo_adapters::query::gql::ast::ConstraintKind;
1434                let kind_str = match stmt.constraint_kind {
1435                    ConstraintKind::Unique => "unique",
1436                    ConstraintKind::NodeKey => "node_key",
1437                    ConstraintKind::NotNull => "not_null",
1438                    ConstraintKind::Exists => "exists",
1439                };
1440                let constraint_name = stmt
1441                    .name
1442                    .clone()
1443                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1444
1445                // Register constraint in catalog type definitions
1446                match stmt.constraint_kind {
1447                    ConstraintKind::Unique => {
1448                        for prop in &stmt.properties {
1449                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1450                            let prop_id = self.catalog.get_or_create_property_key(prop);
1451                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1452                        }
1453                        let _ = self.catalog.add_constraint_to_type(
1454                            &stmt.label,
1455                            TypeConstraint::Unique(stmt.properties.clone()),
1456                        );
1457                    }
1458                    ConstraintKind::NodeKey => {
1459                        for prop in &stmt.properties {
1460                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1461                            let prop_id = self.catalog.get_or_create_property_key(prop);
1462                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1463                            let _ = self.catalog.add_required_property(label_id, prop_id);
1464                        }
1465                        let _ = self.catalog.add_constraint_to_type(
1466                            &stmt.label,
1467                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1468                        );
1469                    }
1470                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1471                        for prop in &stmt.properties {
1472                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1473                            let prop_id = self.catalog.get_or_create_property_key(prop);
1474                            let _ = self.catalog.add_required_property(label_id, prop_id);
1475                            let _ = self.catalog.add_constraint_to_type(
1476                                &stmt.label,
1477                                TypeConstraint::NotNull(prop.clone()),
1478                            );
1479                        }
1480                    }
1481                }
1482
1483                wal_log!(
1484                    self,
1485                    WalRecord::CreateConstraint {
1486                        name: constraint_name.clone(),
1487                        label: stmt.label.clone(),
1488                        properties: stmt.properties.clone(),
1489                        kind: kind_str.to_string(),
1490                    }
1491                );
1492                Ok(QueryResult::status(format!(
1493                    "Created {kind_str} constraint '{constraint_name}'"
1494                )))
1495            }
1496            SchemaStatement::DropConstraint { name, if_exists } => {
1497                let _ = if_exists;
1498                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1499                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1500            }
1501            SchemaStatement::CreateGraphType(stmt) => {
1502                use crate::catalog::GraphTypeDefinition;
1503                use grafeo_adapters::query::gql::ast::InlineElementType;
1504
1505                let effective_name = self.effective_type_key(&stmt.name);
1506
1507                // GG04: LIKE clause copies type from existing graph
1508                let (mut node_types, mut edge_types, open) =
1509                    if let Some(ref like_graph) = stmt.like_graph {
1510                        // Infer types from the graph's bound type, or use its existing types
1511                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1512                            if let Some(existing) = self
1513                                .catalog
1514                                .schema()
1515                                .and_then(|s| s.get_graph_type(&type_name))
1516                            {
1517                                (
1518                                    existing.allowed_node_types.clone(),
1519                                    existing.allowed_edge_types.clone(),
1520                                    existing.open,
1521                                )
1522                            } else {
1523                                (Vec::new(), Vec::new(), true)
1524                            }
1525                        } else {
1526                            // GG22: Infer from graph data (labels used in graph)
1527                            let nt = self.catalog.all_node_type_names();
1528                            let et = self.catalog.all_edge_type_names();
1529                            if nt.is_empty() && et.is_empty() {
1530                                (Vec::new(), Vec::new(), true)
1531                            } else {
1532                                (nt, et, false)
1533                            }
1534                        }
1535                    } else {
1536                        // Prefix element type names with schema for consistency
1537                        let nt = stmt
1538                            .node_types
1539                            .iter()
1540                            .map(|n| self.effective_type_key(n))
1541                            .collect();
1542                        let et = stmt
1543                            .edge_types
1544                            .iter()
1545                            .map(|n| self.effective_type_key(n))
1546                            .collect();
1547                        (nt, et, stmt.open)
1548                    };
1549
1550                // GG03: Register inline element types and add their names
1551                for inline in &stmt.inline_types {
1552                    match inline {
1553                        InlineElementType::Node {
1554                            name,
1555                            properties,
1556                            key_labels,
1557                            ..
1558                        } => {
1559                            let inline_effective = self.effective_type_key(name);
1560                            let def = NodeTypeDefinition {
1561                                name: inline_effective.clone(),
1562                                properties: properties
1563                                    .iter()
1564                                    .map(|p| TypedProperty {
1565                                        name: p.name.clone(),
1566                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1567                                        nullable: p.nullable,
1568                                        default_value: None,
1569                                    })
1570                                    .collect(),
1571                                constraints: Vec::new(),
1572                                parent_types: key_labels.clone(),
1573                            };
1574                            // Register or replace so inline defs override existing
1575                            self.catalog.register_or_replace_node_type(def);
1576                            #[cfg(feature = "wal")]
1577                            {
1578                                let props_for_wal: Vec<(String, String, bool)> = properties
1579                                    .iter()
1580                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1581                                    .collect();
1582                                self.log_schema_wal(&WalRecord::CreateNodeType {
1583                                    name: inline_effective.clone(),
1584                                    properties: props_for_wal,
1585                                    constraints: Vec::new(),
1586                                });
1587                            }
1588                            if !node_types.contains(&inline_effective) {
1589                                node_types.push(inline_effective);
1590                            }
1591                        }
1592                        InlineElementType::Edge {
1593                            name,
1594                            properties,
1595                            source_node_types,
1596                            target_node_types,
1597                            ..
1598                        } => {
1599                            let inline_effective = self.effective_type_key(name);
1600                            let def = EdgeTypeDefinition {
1601                                name: inline_effective.clone(),
1602                                properties: properties
1603                                    .iter()
1604                                    .map(|p| TypedProperty {
1605                                        name: p.name.clone(),
1606                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1607                                        nullable: p.nullable,
1608                                        default_value: None,
1609                                    })
1610                                    .collect(),
1611                                constraints: Vec::new(),
1612                                source_node_types: source_node_types.clone(),
1613                                target_node_types: target_node_types.clone(),
1614                            };
1615                            self.catalog.register_or_replace_edge_type_def(def);
1616                            #[cfg(feature = "wal")]
1617                            {
1618                                let props_for_wal: Vec<(String, String, bool)> = properties
1619                                    .iter()
1620                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1621                                    .collect();
1622                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1623                                    name: inline_effective.clone(),
1624                                    properties: props_for_wal,
1625                                    constraints: Vec::new(),
1626                                });
1627                            }
1628                            if !edge_types.contains(&inline_effective) {
1629                                edge_types.push(inline_effective);
1630                            }
1631                        }
1632                    }
1633                }
1634
1635                let def = GraphTypeDefinition {
1636                    name: effective_name.clone(),
1637                    allowed_node_types: node_types.clone(),
1638                    allowed_edge_types: edge_types.clone(),
1639                    open,
1640                };
1641                let result = if stmt.or_replace {
1642                    // Drop existing first, ignore error if not found
1643                    let _ = self.catalog.drop_graph_type(&effective_name);
1644                    self.catalog.register_graph_type(def)
1645                } else {
1646                    self.catalog.register_graph_type(def)
1647                };
1648                match result {
1649                    Ok(()) => {
1650                        wal_log!(
1651                            self,
1652                            WalRecord::CreateGraphType {
1653                                name: effective_name.clone(),
1654                                node_types,
1655                                edge_types,
1656                                open,
1657                            }
1658                        );
1659                        Ok(QueryResult::status(format!(
1660                            "Created graph type '{}'",
1661                            stmt.name
1662                        )))
1663                    }
1664                    Err(e) if stmt.if_not_exists => {
1665                        let _ = e;
1666                        Ok(QueryResult::status("No change"))
1667                    }
1668                    Err(e) => Err(Error::Query(QueryError::new(
1669                        QueryErrorKind::Semantic,
1670                        e.to_string(),
1671                    ))),
1672                }
1673            }
1674            SchemaStatement::DropGraphType { name, if_exists } => {
1675                let effective_name = self.effective_type_key(&name);
1676                match self.catalog.drop_graph_type(&effective_name) {
1677                    Ok(()) => {
1678                        wal_log!(
1679                            self,
1680                            WalRecord::DropGraphType {
1681                                name: effective_name
1682                            }
1683                        );
1684                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1685                    }
1686                    Err(e) if if_exists => {
1687                        let _ = e;
1688                        Ok(QueryResult::status("No change"))
1689                    }
1690                    Err(e) => Err(Error::Query(QueryError::new(
1691                        QueryErrorKind::Semantic,
1692                        e.to_string(),
1693                    ))),
1694                }
1695            }
1696            SchemaStatement::CreateSchema {
1697                name,
1698                if_not_exists,
1699            } => match self.catalog.register_schema_namespace(name.clone()) {
1700                Ok(()) => {
1701                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1702                    // Auto-create the schema's default graph partition so that
1703                    // SESSION SET SCHEMA + queries work without an explicit graph.
1704                    let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1705                    if self.store.create_graph(&default_key).unwrap_or(false) {
1706                        wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1707                    }
1708                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1709                }
1710                Err(e) if if_not_exists => {
1711                    let _ = e;
1712                    Ok(QueryResult::status("No change"))
1713                }
1714                Err(e) => Err(Error::Query(QueryError::new(
1715                    QueryErrorKind::Semantic,
1716                    e.to_string(),
1717                ))),
1718            },
1719            SchemaStatement::DropSchema { name, if_exists } => {
1720                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1721                // The auto-created __default__ graph is exempt from the check.
1722                let prefix = format!("{name}/");
1723                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1724                let has_graphs = self
1725                    .store
1726                    .graph_names()
1727                    .iter()
1728                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1729                let has_types = self
1730                    .catalog
1731                    .all_node_type_names()
1732                    .iter()
1733                    .any(|n| n.starts_with(&prefix))
1734                    || self
1735                        .catalog
1736                        .all_edge_type_names()
1737                        .iter()
1738                        .any(|n| n.starts_with(&prefix))
1739                    || self
1740                        .catalog
1741                        .all_graph_type_names()
1742                        .iter()
1743                        .any(|n| n.starts_with(&prefix));
1744                if has_graphs || has_types {
1745                    return Err(Error::Query(QueryError::new(
1746                        QueryErrorKind::Semantic,
1747                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1748                    )));
1749                }
1750                match self.catalog.drop_schema_namespace(&name) {
1751                    Ok(()) => {
1752                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1753                        // Drop the auto-created default graph partition
1754                        if self.store.drop_graph(&default_graph_key) {
1755                            wal_log!(
1756                                self,
1757                                WalRecord::DropNamedGraph {
1758                                    name: default_graph_key,
1759                                }
1760                            );
1761                        }
1762                        // If this session was using the dropped schema, reset it
1763                        let mut current = self.current_schema.lock();
1764                        if current
1765                            .as_deref()
1766                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1767                        {
1768                            *current = None;
1769                        }
1770                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1771                    }
1772                    Err(e) if if_exists => {
1773                        let _ = e;
1774                        Ok(QueryResult::status("No change"))
1775                    }
1776                    Err(e) => Err(Error::Query(QueryError::new(
1777                        QueryErrorKind::Semantic,
1778                        e.to_string(),
1779                    ))),
1780                }
1781            }
1782            SchemaStatement::AlterNodeType(stmt) => {
1783                use grafeo_adapters::query::gql::ast::TypeAlteration;
1784                let effective_name = self.effective_type_key(&stmt.name);
1785                let mut wal_alts = Vec::new();
1786                for alt in &stmt.alterations {
1787                    match alt {
1788                        TypeAlteration::AddProperty(prop) => {
1789                            let typed = TypedProperty {
1790                                name: prop.name.clone(),
1791                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1792                                nullable: prop.nullable,
1793                                default_value: prop
1794                                    .default_value
1795                                    .as_ref()
1796                                    .map(|s| parse_default_literal(s)),
1797                            };
1798                            self.catalog
1799                                .alter_node_type_add_property(&effective_name, typed)
1800                                .map_err(|e| {
1801                                    Error::Query(QueryError::new(
1802                                        QueryErrorKind::Semantic,
1803                                        e.to_string(),
1804                                    ))
1805                                })?;
1806                            wal_alts.push((
1807                                "add".to_string(),
1808                                prop.name.clone(),
1809                                prop.data_type.clone(),
1810                                prop.nullable,
1811                            ));
1812                        }
1813                        TypeAlteration::DropProperty(name) => {
1814                            self.catalog
1815                                .alter_node_type_drop_property(&effective_name, name)
1816                                .map_err(|e| {
1817                                    Error::Query(QueryError::new(
1818                                        QueryErrorKind::Semantic,
1819                                        e.to_string(),
1820                                    ))
1821                                })?;
1822                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1823                        }
1824                    }
1825                }
1826                wal_log!(
1827                    self,
1828                    WalRecord::AlterNodeType {
1829                        name: effective_name,
1830                        alterations: wal_alts,
1831                    }
1832                );
1833                Ok(QueryResult::status(format!(
1834                    "Altered node type '{}'",
1835                    stmt.name
1836                )))
1837            }
1838            SchemaStatement::AlterEdgeType(stmt) => {
1839                use grafeo_adapters::query::gql::ast::TypeAlteration;
1840                let effective_name = self.effective_type_key(&stmt.name);
1841                let mut wal_alts = Vec::new();
1842                for alt in &stmt.alterations {
1843                    match alt {
1844                        TypeAlteration::AddProperty(prop) => {
1845                            let typed = TypedProperty {
1846                                name: prop.name.clone(),
1847                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1848                                nullable: prop.nullable,
1849                                default_value: prop
1850                                    .default_value
1851                                    .as_ref()
1852                                    .map(|s| parse_default_literal(s)),
1853                            };
1854                            self.catalog
1855                                .alter_edge_type_add_property(&effective_name, typed)
1856                                .map_err(|e| {
1857                                    Error::Query(QueryError::new(
1858                                        QueryErrorKind::Semantic,
1859                                        e.to_string(),
1860                                    ))
1861                                })?;
1862                            wal_alts.push((
1863                                "add".to_string(),
1864                                prop.name.clone(),
1865                                prop.data_type.clone(),
1866                                prop.nullable,
1867                            ));
1868                        }
1869                        TypeAlteration::DropProperty(name) => {
1870                            self.catalog
1871                                .alter_edge_type_drop_property(&effective_name, name)
1872                                .map_err(|e| {
1873                                    Error::Query(QueryError::new(
1874                                        QueryErrorKind::Semantic,
1875                                        e.to_string(),
1876                                    ))
1877                                })?;
1878                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1879                        }
1880                    }
1881                }
1882                wal_log!(
1883                    self,
1884                    WalRecord::AlterEdgeType {
1885                        name: effective_name,
1886                        alterations: wal_alts,
1887                    }
1888                );
1889                Ok(QueryResult::status(format!(
1890                    "Altered edge type '{}'",
1891                    stmt.name
1892                )))
1893            }
1894            SchemaStatement::AlterGraphType(stmt) => {
1895                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1896                let effective_name = self.effective_type_key(&stmt.name);
1897                let mut wal_alts = Vec::new();
1898                for alt in &stmt.alterations {
1899                    match alt {
1900                        GraphTypeAlteration::AddNodeType(name) => {
1901                            self.catalog
1902                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1903                                .map_err(|e| {
1904                                    Error::Query(QueryError::new(
1905                                        QueryErrorKind::Semantic,
1906                                        e.to_string(),
1907                                    ))
1908                                })?;
1909                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1910                        }
1911                        GraphTypeAlteration::DropNodeType(name) => {
1912                            self.catalog
1913                                .alter_graph_type_drop_node_type(&effective_name, name)
1914                                .map_err(|e| {
1915                                    Error::Query(QueryError::new(
1916                                        QueryErrorKind::Semantic,
1917                                        e.to_string(),
1918                                    ))
1919                                })?;
1920                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1921                        }
1922                        GraphTypeAlteration::AddEdgeType(name) => {
1923                            self.catalog
1924                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1925                                .map_err(|e| {
1926                                    Error::Query(QueryError::new(
1927                                        QueryErrorKind::Semantic,
1928                                        e.to_string(),
1929                                    ))
1930                                })?;
1931                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1932                        }
1933                        GraphTypeAlteration::DropEdgeType(name) => {
1934                            self.catalog
1935                                .alter_graph_type_drop_edge_type(&effective_name, name)
1936                                .map_err(|e| {
1937                                    Error::Query(QueryError::new(
1938                                        QueryErrorKind::Semantic,
1939                                        e.to_string(),
1940                                    ))
1941                                })?;
1942                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1943                        }
1944                    }
1945                }
1946                wal_log!(
1947                    self,
1948                    WalRecord::AlterGraphType {
1949                        name: effective_name,
1950                        alterations: wal_alts,
1951                    }
1952                );
1953                Ok(QueryResult::status(format!(
1954                    "Altered graph type '{}'",
1955                    stmt.name
1956                )))
1957            }
1958            SchemaStatement::CreateProcedure(stmt) => {
1959                use crate::catalog::ProcedureDefinition;
1960
1961                let def = ProcedureDefinition {
1962                    name: stmt.name.clone(),
1963                    params: stmt
1964                        .params
1965                        .iter()
1966                        .map(|p| (p.name.clone(), p.param_type.clone()))
1967                        .collect(),
1968                    returns: stmt
1969                        .returns
1970                        .iter()
1971                        .map(|r| (r.name.clone(), r.return_type.clone()))
1972                        .collect(),
1973                    body: stmt.body.clone(),
1974                };
1975
1976                if stmt.or_replace {
1977                    self.catalog.replace_procedure(def).map_err(|e| {
1978                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1979                    })?;
1980                } else {
1981                    match self.catalog.register_procedure(def) {
1982                        Ok(()) => {}
1983                        Err(_) if stmt.if_not_exists => {
1984                            return Ok(QueryResult::empty());
1985                        }
1986                        Err(e) => {
1987                            return Err(Error::Query(QueryError::new(
1988                                QueryErrorKind::Semantic,
1989                                e.to_string(),
1990                            )));
1991                        }
1992                    }
1993                }
1994
1995                wal_log!(
1996                    self,
1997                    WalRecord::CreateProcedure {
1998                        name: stmt.name.clone(),
1999                        params: stmt
2000                            .params
2001                            .iter()
2002                            .map(|p| (p.name.clone(), p.param_type.clone()))
2003                            .collect(),
2004                        returns: stmt
2005                            .returns
2006                            .iter()
2007                            .map(|r| (r.name.clone(), r.return_type.clone()))
2008                            .collect(),
2009                        body: stmt.body,
2010                    }
2011                );
2012                Ok(QueryResult::status(format!(
2013                    "Created procedure '{}'",
2014                    stmt.name
2015                )))
2016            }
2017            SchemaStatement::DropProcedure { name, if_exists } => {
2018                match self.catalog.drop_procedure(&name) {
2019                    Ok(()) => {}
2020                    Err(_) if if_exists => {
2021                        return Ok(QueryResult::empty());
2022                    }
2023                    Err(e) => {
2024                        return Err(Error::Query(QueryError::new(
2025                            QueryErrorKind::Semantic,
2026                            e.to_string(),
2027                        )));
2028                    }
2029                }
2030                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
2031                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
2032            }
2033            SchemaStatement::ShowIndexes => {
2034                return self.execute_show_indexes();
2035            }
2036            SchemaStatement::ShowConstraints => {
2037                return self.execute_show_constraints();
2038            }
2039            SchemaStatement::ShowNodeTypes => {
2040                return self.execute_show_node_types();
2041            }
2042            SchemaStatement::ShowEdgeTypes => {
2043                return self.execute_show_edge_types();
2044            }
2045            SchemaStatement::ShowGraphTypes => {
2046                return self.execute_show_graph_types();
2047            }
2048            SchemaStatement::ShowGraphType(name) => {
2049                return self.execute_show_graph_type(&name);
2050            }
2051            SchemaStatement::ShowCurrentGraphType => {
2052                return self.execute_show_current_graph_type();
2053            }
2054            SchemaStatement::ShowGraphs => {
2055                return self.execute_show_graphs();
2056            }
2057            SchemaStatement::ShowSchemas => {
2058                return self.execute_show_schemas();
2059            }
2060        };
2061
2062        // Invalidate all cached query plans after any successful DDL change.
2063        // DDL is rare, so clearing the entire cache is cheap and correct.
2064        if result.is_ok() {
2065            self.query_cache.clear();
2066        }
2067
2068        result
2069    }
2070
2071    /// Creates a vector index on the store by scanning existing nodes.
2072    #[cfg(all(feature = "lpg", feature = "gql", feature = "vector-index"))]
2073    fn create_vector_index_on_store(
2074        store: &LpgStore,
2075        label: &str,
2076        property: &str,
2077        dimensions: Option<usize>,
2078        metric: Option<&str>,
2079    ) -> Result<()> {
2080        use grafeo_common::types::{PropertyKey, Value};
2081        use grafeo_common::utils::error::Error;
2082        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex, VectorIndexKind};
2083
2084        let metric = match metric {
2085            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
2086                Error::Internal(format!(
2087                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
2088                ))
2089            })?,
2090            None => DistanceMetric::Cosine,
2091        };
2092
2093        let prop_key = PropertyKey::new(property);
2094        let mut found_dims: Option<usize> = dimensions;
2095        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
2096
2097        for node in store.nodes_with_label(label) {
2098            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
2099                if let Some(expected) = found_dims {
2100                    if v.len() != expected {
2101                        return Err(Error::Internal(format!(
2102                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
2103                            v.len(),
2104                            node.id.0
2105                        )));
2106                    }
2107                } else {
2108                    found_dims = Some(v.len());
2109                }
2110                vectors.push((node.id, v.to_vec()));
2111            }
2112        }
2113
2114        let Some(dims) = found_dims else {
2115            return Err(Error::Internal(format!(
2116                "No vector properties found on :{label}({property}) and no dimensions specified"
2117            )));
2118        };
2119
2120        let config = HnswConfig::new(dims, metric);
2121        let index = HnswIndex::with_capacity(config, vectors.len());
2122        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
2123        for (node_id, vec) in &vectors {
2124            index.insert(*node_id, vec, &accessor);
2125        }
2126
2127        store.add_vector_index(label, property, Arc::new(VectorIndexKind::Hnsw(index)));
2128        Ok(())
2129    }
2130
2131    /// Stub for when vector-index feature is not enabled.
2132    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "vector-index")))]
2133    fn create_vector_index_on_store(
2134        _store: &LpgStore,
2135        _label: &str,
2136        _property: &str,
2137        _dimensions: Option<usize>,
2138        _metric: Option<&str>,
2139    ) -> Result<()> {
2140        Err(grafeo_common::utils::error::Error::Internal(
2141            "Vector index support requires the 'vector-index' feature".to_string(),
2142        ))
2143    }
2144
2145    /// Creates a text index on the store by scanning existing nodes.
2146    #[cfg(all(feature = "lpg", feature = "gql", feature = "text-index"))]
2147    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
2148        use grafeo_common::types::{PropertyKey, Value};
2149        use grafeo_core::index::text::{BM25Config, InvertedIndex};
2150
2151        let mut index = InvertedIndex::new(BM25Config::default());
2152        let prop_key = PropertyKey::new(property);
2153
2154        let nodes = store.nodes_by_label(label);
2155        for node_id in nodes {
2156            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
2157                index.insert(node_id, text.as_str());
2158            }
2159        }
2160
2161        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
2162        Ok(())
2163    }
2164
2165    /// Stub for when text-index feature is not enabled.
2166    #[cfg(all(feature = "lpg", feature = "gql", not(feature = "text-index")))]
2167    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
2168        Err(grafeo_common::utils::error::Error::Internal(
2169            "Text index support requires the 'text-index' feature".to_string(),
2170        ))
2171    }
2172
2173    /// Returns a table of all indexes from the catalog.
2174    fn execute_show_indexes(&self) -> Result<QueryResult> {
2175        let indexes = self.catalog.all_indexes();
2176        let columns = vec![
2177            "name".to_string(),
2178            "type".to_string(),
2179            "label".to_string(),
2180            "property".to_string(),
2181        ];
2182        let rows: Vec<Vec<Value>> = indexes
2183            .into_iter()
2184            .map(|def| {
2185                let label_name = self
2186                    .catalog
2187                    .get_label_name(def.label)
2188                    .unwrap_or_else(|| "?".into());
2189                let prop_name = self
2190                    .catalog
2191                    .get_property_key_name(def.property_key)
2192                    .unwrap_or_else(|| "?".into());
2193                vec![
2194                    Value::from(def.name),
2195                    Value::from(format!("{:?}", def.index_type)),
2196                    Value::from(&*label_name),
2197                    Value::from(&*prop_name),
2198                ]
2199            })
2200            .collect();
2201        Ok(QueryResult {
2202            columns,
2203            column_types: Vec::new(),
2204            rows,
2205            ..QueryResult::empty()
2206        })
2207    }
2208
2209    /// Returns a table of all constraints (currently metadata-only).
2210    fn execute_show_constraints(&self) -> Result<QueryResult> {
2211        // Constraints are tracked in WAL but not yet in a queryable catalog.
2212        // Return an empty table with the expected schema.
2213        Ok(QueryResult {
2214            columns: vec![
2215                "name".to_string(),
2216                "type".to_string(),
2217                "label".to_string(),
2218                "properties".to_string(),
2219            ],
2220            column_types: Vec::new(),
2221            rows: Vec::new(),
2222            ..QueryResult::empty()
2223        })
2224    }
2225
2226    /// Returns a table of all registered node types in the current schema.
2227    fn execute_show_node_types(&self) -> Result<QueryResult> {
2228        let columns = vec![
2229            "name".to_string(),
2230            "properties".to_string(),
2231            "constraints".to_string(),
2232            "parents".to_string(),
2233        ];
2234        let schema = self.current_schema.lock().clone();
2235        let all_names = self.catalog.all_node_type_names();
2236        let type_names: Vec<String> = match &schema {
2237            Some(s) => {
2238                let prefix = format!("{s}/");
2239                all_names
2240                    .into_iter()
2241                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2242                    .collect()
2243            }
2244            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2245        };
2246        let rows: Vec<Vec<Value>> = type_names
2247            .into_iter()
2248            .filter_map(|name| {
2249                let lookup = match &schema {
2250                    Some(s) => format!("{s}/{name}"),
2251                    None => name.clone(),
2252                };
2253                let def = self.catalog.get_node_type(&lookup)?;
2254                let props: Vec<String> = def
2255                    .properties
2256                    .iter()
2257                    .map(|p| {
2258                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2259                        format!("{} {}{}", p.name, p.data_type, nullable)
2260                    })
2261                    .collect();
2262                let constraints: Vec<String> =
2263                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2264                let parents = def.parent_types.join(", ");
2265                Some(vec![
2266                    Value::from(name),
2267                    Value::from(props.join(", ")),
2268                    Value::from(constraints.join(", ")),
2269                    Value::from(parents),
2270                ])
2271            })
2272            .collect();
2273        Ok(QueryResult {
2274            columns,
2275            column_types: Vec::new(),
2276            rows,
2277            ..QueryResult::empty()
2278        })
2279    }
2280
2281    /// Returns a table of all registered edge types in the current schema.
2282    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2283        let columns = vec![
2284            "name".to_string(),
2285            "properties".to_string(),
2286            "source_types".to_string(),
2287            "target_types".to_string(),
2288        ];
2289        let schema = self.current_schema.lock().clone();
2290        let all_names = self.catalog.all_edge_type_names();
2291        let type_names: Vec<String> = match &schema {
2292            Some(s) => {
2293                let prefix = format!("{s}/");
2294                all_names
2295                    .into_iter()
2296                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2297                    .collect()
2298            }
2299            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2300        };
2301        let rows: Vec<Vec<Value>> = type_names
2302            .into_iter()
2303            .filter_map(|name| {
2304                let lookup = match &schema {
2305                    Some(s) => format!("{s}/{name}"),
2306                    None => name.clone(),
2307                };
2308                let def = self.catalog.get_edge_type_def(&lookup)?;
2309                let props: Vec<String> = def
2310                    .properties
2311                    .iter()
2312                    .map(|p| {
2313                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2314                        format!("{} {}{}", p.name, p.data_type, nullable)
2315                    })
2316                    .collect();
2317                let src = def.source_node_types.join(", ");
2318                let tgt = def.target_node_types.join(", ");
2319                Some(vec![
2320                    Value::from(name),
2321                    Value::from(props.join(", ")),
2322                    Value::from(src),
2323                    Value::from(tgt),
2324                ])
2325            })
2326            .collect();
2327        Ok(QueryResult {
2328            columns,
2329            column_types: Vec::new(),
2330            rows,
2331            ..QueryResult::empty()
2332        })
2333    }
2334
2335    /// Returns a table of all registered graph types in the current schema.
2336    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2337        let columns = vec![
2338            "name".to_string(),
2339            "open".to_string(),
2340            "node_types".to_string(),
2341            "edge_types".to_string(),
2342        ];
2343        let schema = self.current_schema.lock().clone();
2344        let all_names = self.catalog.all_graph_type_names();
2345        let type_names: Vec<String> = match &schema {
2346            Some(s) => {
2347                let prefix = format!("{s}/");
2348                all_names
2349                    .into_iter()
2350                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2351                    .collect()
2352            }
2353            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2354        };
2355        let rows: Vec<Vec<Value>> = type_names
2356            .into_iter()
2357            .filter_map(|name| {
2358                let lookup = match &schema {
2359                    Some(s) => format!("{s}/{name}"),
2360                    None => name.clone(),
2361                };
2362                let def = self.catalog.get_graph_type_def(&lookup)?;
2363                // Strip schema prefix from allowed type names for display
2364                let strip = |n: &String| -> String {
2365                    match &schema {
2366                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2367                        None => n.clone(),
2368                    }
2369                };
2370                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2371                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2372                Some(vec![
2373                    Value::from(name),
2374                    Value::from(def.open),
2375                    Value::from(node_types.join(", ")),
2376                    Value::from(edge_types.join(", ")),
2377                ])
2378            })
2379            .collect();
2380        Ok(QueryResult {
2381            columns,
2382            column_types: Vec::new(),
2383            rows,
2384            ..QueryResult::empty()
2385        })
2386    }
2387
2388    /// Returns the list of named graphs visible in the current schema context.
2389    ///
2390    /// When a session schema is set, only graphs belonging to that schema are
2391    /// shown (their compound prefix is stripped). When no schema is set, graphs
2392    /// without a schema prefix are shown (the default schema).
2393    #[cfg(feature = "lpg")]
2394    fn execute_show_graphs(&self) -> Result<QueryResult> {
2395        let schema = self.current_schema.lock().clone();
2396        let all_names = self.store.graph_names();
2397
2398        let mut names: Vec<String> = match &schema {
2399            Some(s) => {
2400                let prefix = format!("{s}/");
2401                all_names
2402                    .into_iter()
2403                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2404                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2405                    .collect()
2406            }
2407            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2408        };
2409        names.sort();
2410
2411        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2412        Ok(QueryResult {
2413            columns: vec!["name".to_string()],
2414            column_types: Vec::new(),
2415            rows,
2416            ..QueryResult::empty()
2417        })
2418    }
2419
2420    /// Returns the list of all schema namespaces.
2421    fn execute_show_schemas(&self) -> Result<QueryResult> {
2422        let mut names = self.catalog.schema_names();
2423        names.sort();
2424        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2425        Ok(QueryResult {
2426            columns: vec!["name".to_string()],
2427            column_types: Vec::new(),
2428            rows,
2429            ..QueryResult::empty()
2430        })
2431    }
2432
2433    /// Returns detailed info for a specific graph type.
2434    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2435        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2436
2437        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2438            Error::Query(QueryError::new(
2439                QueryErrorKind::Semantic,
2440                format!("Graph type '{name}' not found"),
2441            ))
2442        })?;
2443
2444        let columns = vec![
2445            "name".to_string(),
2446            "open".to_string(),
2447            "node_types".to_string(),
2448            "edge_types".to_string(),
2449        ];
2450        let rows = vec![vec![
2451            Value::from(def.name),
2452            Value::from(def.open),
2453            Value::from(def.allowed_node_types.join(", ")),
2454            Value::from(def.allowed_edge_types.join(", ")),
2455        ]];
2456        Ok(QueryResult {
2457            columns,
2458            column_types: Vec::new(),
2459            rows,
2460            ..QueryResult::empty()
2461        })
2462    }
2463
2464    /// Returns the graph type bound to the current graph.
2465    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2466        let graph_name = self
2467            .current_graph()
2468            .unwrap_or_else(|| "default".to_string());
2469        let columns = vec![
2470            "graph".to_string(),
2471            "graph_type".to_string(),
2472            "open".to_string(),
2473            "node_types".to_string(),
2474            "edge_types".to_string(),
2475        ];
2476
2477        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2478            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2479        {
2480            let rows = vec![vec![
2481                Value::from(graph_name),
2482                Value::from(type_name),
2483                Value::from(def.open),
2484                Value::from(def.allowed_node_types.join(", ")),
2485                Value::from(def.allowed_edge_types.join(", ")),
2486            ]];
2487            return Ok(QueryResult {
2488                columns,
2489                column_types: Vec::new(),
2490                rows,
2491                ..QueryResult::empty()
2492            });
2493        }
2494
2495        // No graph type binding found
2496        Ok(QueryResult {
2497            columns,
2498            column_types: Vec::new(),
2499            rows: vec![vec![
2500                Value::from(graph_name),
2501                Value::Null,
2502                Value::Null,
2503                Value::Null,
2504                Value::Null,
2505            ]],
2506            ..QueryResult::empty()
2507        })
2508    }
2509
2510    /// Executes a GQL query.
2511    ///
2512    /// # Errors
2513    ///
2514    /// Returns an error if the query fails to parse or execute.
2515    ///
2516    /// # Examples
2517    ///
2518    /// ```no_run
2519    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2520    /// use grafeo_engine::GrafeoDB;
2521    ///
2522    /// let db = GrafeoDB::new_in_memory();
2523    /// let session = db.session();
2524    ///
2525    /// // Create a node
2526    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2527    ///
2528    /// // Query nodes
2529    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2530    /// for row in result.rows() {
2531    ///     println!("{:?}", row);
2532    /// }
2533    /// # Ok(())
2534    /// # }
2535    /// ```
2536    #[cfg(feature = "gql")]
2537    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2538        self.require_lpg("GQL")?;
2539
2540        use crate::query::{
2541            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2542            translators::gql,
2543        };
2544
2545        let _span = grafeo_info_span!(
2546            "grafeo::session::execute",
2547            language = "gql",
2548            query_len = query.len(),
2549        );
2550
2551        #[cfg(not(target_arch = "wasm32"))]
2552        let start_time = std::time::Instant::now();
2553
2554        // Parse and translate, checking for session/schema commands first
2555        let translation = gql::translate_full(query)?;
2556        let logical_plan = match translation {
2557            gql::GqlTranslationResult::SessionCommand(cmd) => {
2558                return self.execute_session_command(cmd);
2559            }
2560            #[cfg(feature = "lpg")]
2561            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2562                // All DDL requires Admin role
2563                self.require_permission(crate::auth::StatementKind::Admin)?;
2564                if *self.read_only_tx.lock() {
2565                    return Err(grafeo_common::utils::error::Error::Transaction(
2566                        grafeo_common::utils::error::TransactionError::ReadOnly,
2567                    ));
2568                }
2569                return self.execute_schema_command(cmd);
2570            }
2571            gql::GqlTranslationResult::Plan(plan) => {
2572                // Only walk the operator tree when it matters: non-admin
2573                // identities need permission checks, read-only transactions
2574                // need mutation blocking. Admin sessions in auto-commit mode
2575                // skip the tree walk entirely.
2576                let read_only = *self.read_only_tx.lock();
2577                let need_check = read_only || !self.identity.can_admin();
2578                let is_mutation = need_check && plan.root.has_mutations();
2579                if is_mutation {
2580                    self.require_permission(crate::auth::StatementKind::Write)?;
2581                }
2582                if read_only && is_mutation {
2583                    return Err(grafeo_common::utils::error::Error::Transaction(
2584                        grafeo_common::utils::error::TransactionError::ReadOnly,
2585                    ));
2586                }
2587                plan
2588            }
2589            #[cfg(not(feature = "lpg"))]
2590            gql::GqlTranslationResult::SchemaCommand(_) => {
2591                return Err(grafeo_common::utils::error::Error::Internal(
2592                    "Schema commands require the `lpg` feature".to_string(),
2593                ));
2594            }
2595        };
2596
2597        // Create cache key for this query
2598        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2599
2600        // Try to get cached optimized plan, or use the plan we just translated
2601        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2602            cached_plan
2603        } else {
2604            // Semantic validation
2605            let mut binder = Binder::new();
2606            let _binding_context = binder.bind(&logical_plan)?;
2607
2608            // Optimize the plan
2609            let active = self.active_store();
2610            let optimizer = Optimizer::from_graph_store(&*active);
2611            let plan = optimizer.optimize(logical_plan)?;
2612
2613            // Cache the optimized plan for future use
2614            self.query_cache.put_optimized(cache_key, plan.clone());
2615
2616            plan
2617        };
2618
2619        // Resolve the active store for query execution
2620        let active = self.active_store();
2621
2622        // EXPLAIN: annotate pushdown hints and return the plan tree
2623        if optimized_plan.explain {
2624            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2625            let mut plan = optimized_plan;
2626            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2627            return Ok(explain_result(&plan));
2628        }
2629
2630        // PROFILE: execute with per-operator instrumentation
2631        if optimized_plan.profile {
2632            let has_mutations = optimized_plan.root.has_mutations();
2633            return self.with_auto_commit(has_mutations, || {
2634                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2635                let planner = self.create_planner_for_store(
2636                    Arc::clone(&active),
2637                    viewing_epoch,
2638                    transaction_id,
2639                );
2640                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2641
2642                let executor = self.make_executor(physical_plan.columns.clone());
2643                let _result = executor.execute(physical_plan.operator.as_mut())?;
2644
2645                let total_time_ms;
2646                #[cfg(not(target_arch = "wasm32"))]
2647                {
2648                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2649                }
2650                #[cfg(target_arch = "wasm32")]
2651                {
2652                    total_time_ms = 0.0;
2653                }
2654
2655                let profile_tree = crate::query::profile::build_profile_tree(
2656                    &optimized_plan.root,
2657                    &mut entries.into_iter(),
2658                );
2659                Ok(crate::query::profile::profile_result(
2660                    &profile_tree,
2661                    total_time_ms,
2662                ))
2663            });
2664        }
2665
2666        let has_mutations = optimized_plan.root.has_mutations();
2667
2668        let result = self.with_auto_commit(has_mutations, || {
2669            // Get transaction context for MVCC visibility
2670            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2671
2672            // Convert to physical plan with transaction context
2673            // (Physical planning cannot be cached as it depends on transaction state)
2674            // Safe to use read-only fast path when: this query has no mutations AND
2675            // there is no active transaction that may have prior uncommitted writes.
2676            let has_active_tx = self.current_transaction.lock().is_some();
2677            let read_only = !has_mutations && !has_active_tx;
2678            let planner = self.create_planner_for_store_with_read_only(
2679                Arc::clone(&active),
2680                viewing_epoch,
2681                transaction_id,
2682                read_only,
2683            );
2684            let physical_plan = planner.plan(&optimized_plan)?;
2685
2686            // Execute the plan via push-based pipeline when possible
2687            let executor = self.make_executor(physical_plan.columns.clone());
2688            let (mut source, push_ops) = {
2689                #[cfg(feature = "spill")]
2690                {
2691                    let memory_ctx = self.make_operator_memory_context();
2692                    grafeo_core::execution::pipeline_convert::convert_to_pipeline_with_memory(
2693                        physical_plan.into_operator(),
2694                        memory_ctx,
2695                    )
2696                }
2697                #[cfg(not(feature = "spill"))]
2698                {
2699                    grafeo_core::execution::pipeline_convert::convert_to_pipeline(
2700                        physical_plan.into_operator(),
2701                    )
2702                }
2703            };
2704            let mut result = if push_ops.is_empty() {
2705                // Pure source query: use traditional pull-based execution
2706                executor.execute(source.as_mut())?
2707            } else {
2708                // Pipeline execution: push data from source through operators
2709                executor.execute_pipeline(source, push_ops)?
2710            };
2711
2712            // Add execution metrics
2713            let rows_scanned = result.rows.len() as u64;
2714            #[cfg(not(target_arch = "wasm32"))]
2715            {
2716                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2717                result.execution_time_ms = Some(elapsed_ms);
2718            }
2719            result.rows_scanned = Some(rows_scanned);
2720
2721            Ok(result)
2722        });
2723
2724        // Record metrics for this query execution.
2725        #[cfg(feature = "metrics")]
2726        {
2727            #[cfg(not(target_arch = "wasm32"))]
2728            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2729            #[cfg(target_arch = "wasm32")]
2730            let elapsed_ms = None;
2731            self.record_query_metrics("gql", elapsed_ms, &result);
2732        }
2733
2734        result
2735    }
2736
2737    /// Executes a GQL query with visibility at the specified epoch.
2738    ///
2739    /// This enables time-travel queries: the query sees the database
2740    /// as it existed at the given epoch.
2741    ///
2742    /// # Errors
2743    ///
2744    /// Returns an error if parsing or execution fails.
2745    #[cfg(feature = "gql")]
2746    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2747        let previous = self.viewing_epoch_override.lock().replace(epoch);
2748        let result = self.execute(query);
2749        *self.viewing_epoch_override.lock() = previous;
2750        result
2751    }
2752
2753    /// Executes a GQL query at a specific epoch with optional parameters.
2754    ///
2755    /// Combines epoch-based time travel with parameterized queries.
2756    ///
2757    /// # Errors
2758    ///
2759    /// Returns an error if parsing or execution fails.
2760    #[cfg(feature = "gql")]
2761    pub fn execute_at_epoch_with_params(
2762        &self,
2763        query: &str,
2764        epoch: EpochId,
2765        params: Option<std::collections::HashMap<String, Value>>,
2766    ) -> Result<QueryResult> {
2767        let previous = self.viewing_epoch_override.lock().replace(epoch);
2768        let result = if let Some(p) = params {
2769            self.execute_with_params(query, p)
2770        } else {
2771            self.execute(query)
2772        };
2773        *self.viewing_epoch_override.lock() = previous;
2774        result
2775    }
2776
2777    /// Executes a GQL query with parameters.
2778    ///
2779    /// # Errors
2780    ///
2781    /// Returns an error if the query fails to parse or execute.
2782    #[cfg(feature = "gql")]
2783    pub fn execute_with_params(
2784        &self,
2785        query: &str,
2786        params: std::collections::HashMap<String, Value>,
2787    ) -> Result<QueryResult> {
2788        self.require_lpg("GQL")?;
2789
2790        use crate::query::processor::{QueryLanguage, QueryProcessor};
2791
2792        // Reject writes if the identity lacks permission. Parse the query
2793        // to determine mutation status reliably (the text heuristic has false
2794        // negatives that could bypass authorization).
2795        let has_mutations = if self.identity.can_write() {
2796            // Fast path: identity can write, use heuristic for auto-commit only
2797            Self::query_looks_like_mutation(query)
2798        } else {
2799            // Restricted identity: parse to check mutations reliably
2800            use crate::query::translators::gql;
2801            match gql::translate(query) {
2802                Ok(plan) if plan.root.has_mutations() => {
2803                    self.require_permission(crate::auth::StatementKind::Write)?;
2804                    true
2805                }
2806                Ok(_) => false,
2807                // Parse error: let the processor handle it below
2808                Err(_) => Self::query_looks_like_mutation(query),
2809            }
2810        };
2811        let active = self.active_store();
2812
2813        self.with_auto_commit(has_mutations, || {
2814            // Get transaction context for MVCC visibility
2815            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2816
2817            // Create processor with transaction context
2818            let processor = QueryProcessor::for_stores_with_transaction(
2819                Arc::clone(&active),
2820                self.active_write_store(),
2821                Arc::clone(&self.transaction_manager),
2822            )?;
2823
2824            // Apply transaction context if in a transaction
2825            let processor = if let Some(transaction_id) = transaction_id {
2826                processor.with_transaction_context(viewing_epoch, transaction_id)
2827            } else {
2828                processor
2829            };
2830
2831            processor.process(query, QueryLanguage::Gql, Some(&params))
2832        })
2833    }
2834
2835    /// Executes a GQL query with parameters.
2836    ///
2837    /// # Errors
2838    ///
2839    /// Returns an error if no query language is enabled.
2840    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2841    pub fn execute_with_params(
2842        &self,
2843        _query: &str,
2844        _params: std::collections::HashMap<String, Value>,
2845    ) -> Result<QueryResult> {
2846        Err(grafeo_common::utils::error::Error::Internal(
2847            "No query language enabled".to_string(),
2848        ))
2849    }
2850
2851    /// Executes a GQL query.
2852    ///
2853    /// # Errors
2854    ///
2855    /// Returns an error if no query language is enabled.
2856    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2857    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2858        Err(grafeo_common::utils::error::Error::Internal(
2859            "No query language enabled".to_string(),
2860        ))
2861    }
2862
2863    /// Executes a Cypher query.
2864    ///
2865    /// # Errors
2866    ///
2867    /// Returns an error if the query fails to parse or execute.
2868    #[cfg(feature = "cypher")]
2869    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2870        use crate::query::{
2871            binder::Binder, cache::CacheKey, optimizer::Optimizer, processor::QueryLanguage,
2872            translators::cypher,
2873        };
2874
2875        // Handle schema DDL and SHOW commands before the normal query path
2876        let translation = cypher::translate_full(query)?;
2877        match translation {
2878            #[cfg(feature = "lpg")]
2879            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2880                use grafeo_common::utils::error::{
2881                    Error as GrafeoError, QueryError, QueryErrorKind,
2882                };
2883                self.require_permission(crate::auth::StatementKind::Admin)?;
2884                if *self.read_only_tx.lock() {
2885                    return Err(GrafeoError::Query(QueryError::new(
2886                        QueryErrorKind::Semantic,
2887                        "Cannot execute schema DDL in a read-only transaction",
2888                    )));
2889                }
2890                return self.execute_schema_command(cmd);
2891            }
2892            #[cfg(not(feature = "lpg"))]
2893            cypher::CypherTranslationResult::SchemaCommand(_) => {
2894                return Err(grafeo_common::utils::error::Error::Internal(
2895                    "Schema DDL requires the `lpg` feature".to_string(),
2896                ));
2897            }
2898            cypher::CypherTranslationResult::ShowIndexes => {
2899                return self.execute_show_indexes();
2900            }
2901            cypher::CypherTranslationResult::ShowConstraints => {
2902                return self.execute_show_constraints();
2903            }
2904            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2905                return self.execute_show_current_graph_type();
2906            }
2907            cypher::CypherTranslationResult::Plan(_) => {
2908                // Fall through to normal execution below
2909            }
2910        }
2911
2912        #[cfg(not(target_arch = "wasm32"))]
2913        let start_time = std::time::Instant::now();
2914
2915        // Create cache key for this query
2916        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2917
2918        // Try to get cached optimized plan
2919        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2920            cached_plan
2921        } else {
2922            // Parse and translate the query to a logical plan
2923            let logical_plan = cypher::translate(query)?;
2924
2925            // Semantic validation
2926            let mut binder = Binder::new();
2927            let _binding_context = binder.bind(&logical_plan)?;
2928
2929            // Optimize the plan
2930            let active = self.active_store();
2931            let optimizer = Optimizer::from_graph_store(&*active);
2932            let plan = optimizer.optimize(logical_plan)?;
2933
2934            // Cache the optimized plan
2935            self.query_cache.put_optimized(cache_key, plan.clone());
2936
2937            plan
2938        };
2939
2940        // Check role-based permission for mutations
2941        if optimized_plan.root.has_mutations() {
2942            self.require_permission(crate::auth::StatementKind::Write)?;
2943        }
2944
2945        // Resolve the active store for query execution
2946        let active = self.active_store();
2947
2948        // EXPLAIN
2949        if optimized_plan.explain {
2950            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2951            let mut plan = optimized_plan;
2952            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2953            return Ok(explain_result(&plan));
2954        }
2955
2956        // PROFILE
2957        if optimized_plan.profile {
2958            let has_mutations = optimized_plan.root.has_mutations();
2959            return self.with_auto_commit(has_mutations, || {
2960                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2961                let planner = self.create_planner_for_store(
2962                    Arc::clone(&active),
2963                    viewing_epoch,
2964                    transaction_id,
2965                );
2966                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2967
2968                let executor = self.make_executor(physical_plan.columns.clone());
2969                let _result = executor.execute(physical_plan.operator.as_mut())?;
2970
2971                let total_time_ms;
2972                #[cfg(not(target_arch = "wasm32"))]
2973                {
2974                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2975                }
2976                #[cfg(target_arch = "wasm32")]
2977                {
2978                    total_time_ms = 0.0;
2979                }
2980
2981                let profile_tree = crate::query::profile::build_profile_tree(
2982                    &optimized_plan.root,
2983                    &mut entries.into_iter(),
2984                );
2985                Ok(crate::query::profile::profile_result(
2986                    &profile_tree,
2987                    total_time_ms,
2988                ))
2989            });
2990        }
2991
2992        let has_mutations = optimized_plan.root.has_mutations();
2993
2994        let result = self.with_auto_commit(has_mutations, || {
2995            // Get transaction context for MVCC visibility
2996            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2997
2998            // Convert to physical plan with transaction context
2999            let planner =
3000                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3001            let mut physical_plan = planner.plan(&optimized_plan)?;
3002
3003            // Execute the plan
3004            let executor = self.make_executor(physical_plan.columns.clone());
3005            executor.execute(physical_plan.operator.as_mut())
3006        });
3007
3008        #[cfg(feature = "metrics")]
3009        {
3010            #[cfg(not(target_arch = "wasm32"))]
3011            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3012            #[cfg(target_arch = "wasm32")]
3013            let elapsed_ms = None;
3014            self.record_query_metrics("cypher", elapsed_ms, &result);
3015        }
3016
3017        result
3018    }
3019
3020    /// Executes a Gremlin query.
3021    ///
3022    /// # Errors
3023    ///
3024    /// Returns an error if the query fails to parse or execute.
3025    ///
3026    /// # Examples
3027    ///
3028    /// ```no_run
3029    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3030    /// use grafeo_engine::GrafeoDB;
3031    ///
3032    /// let db = GrafeoDB::new_in_memory();
3033    /// let session = db.session();
3034    ///
3035    /// // Create some nodes first
3036    /// session.create_node(&["Person"]);
3037    ///
3038    /// // Query using Gremlin
3039    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
3040    /// # Ok(())
3041    /// # }
3042    /// ```
3043    #[cfg(feature = "gremlin")]
3044    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
3045        use crate::query::{binder::Binder, optimizer::Optimizer, translators::gremlin};
3046
3047        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3048        let start_time = Instant::now();
3049
3050        // Parse and translate the query to a logical plan
3051        let logical_plan = gremlin::translate(query)?;
3052
3053        // Semantic validation
3054        let mut binder = Binder::new();
3055        let _binding_context = binder.bind(&logical_plan)?;
3056
3057        // Optimize the plan
3058        let active = self.active_store();
3059        let optimizer = Optimizer::from_graph_store(&*active);
3060        let optimized_plan = optimizer.optimize(logical_plan)?;
3061
3062        let has_mutations = optimized_plan.root.has_mutations();
3063        if has_mutations {
3064            self.require_permission(crate::auth::StatementKind::Write)?;
3065        }
3066
3067        let result = self.with_auto_commit(has_mutations, || {
3068            // Get transaction context for MVCC visibility
3069            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3070
3071            // Convert to physical plan with transaction context
3072            let planner =
3073                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3074            let mut physical_plan = planner.plan(&optimized_plan)?;
3075
3076            // Execute the plan
3077            let executor = self.make_executor(physical_plan.columns.clone());
3078            executor.execute(physical_plan.operator.as_mut())
3079        });
3080
3081        #[cfg(feature = "metrics")]
3082        {
3083            #[cfg(not(target_arch = "wasm32"))]
3084            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3085            #[cfg(target_arch = "wasm32")]
3086            let elapsed_ms = None;
3087            self.record_query_metrics("gremlin", elapsed_ms, &result);
3088        }
3089
3090        result
3091    }
3092
3093    /// Executes a Gremlin query with parameters.
3094    ///
3095    /// # Errors
3096    ///
3097    /// Returns an error if the query fails to parse or execute.
3098    #[cfg(feature = "gremlin")]
3099    pub fn execute_gremlin_with_params(
3100        &self,
3101        query: &str,
3102        params: std::collections::HashMap<String, Value>,
3103    ) -> Result<QueryResult> {
3104        use crate::query::{
3105            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3106            translators::gremlin,
3107        };
3108
3109        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3110        let start_time = Instant::now();
3111
3112        // Parse and translate the query to a logical plan
3113        let mut logical_plan = gremlin::translate(query)?;
3114
3115        // Substitute parameters
3116        substitute_params(&mut logical_plan, &params)?;
3117
3118        // Semantic validation
3119        let mut binder = Binder::new();
3120        let _binding_context = binder.bind(&logical_plan)?;
3121
3122        // Optimize the plan
3123        let active = self.active_store();
3124        let optimizer = Optimizer::from_graph_store(&*active);
3125        let optimized_plan = optimizer.optimize(logical_plan)?;
3126
3127        let has_mutations = optimized_plan.root.has_mutations();
3128        if has_mutations {
3129            self.require_permission(crate::auth::StatementKind::Write)?;
3130        }
3131
3132        let result = self.with_auto_commit(has_mutations, || {
3133            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3134            let planner =
3135                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3136            let mut physical_plan = planner.plan(&optimized_plan)?;
3137            let executor = self.make_executor(physical_plan.columns.clone());
3138            executor.execute(physical_plan.operator.as_mut())
3139        });
3140
3141        #[cfg(feature = "metrics")]
3142        {
3143            #[cfg(not(target_arch = "wasm32"))]
3144            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3145            #[cfg(target_arch = "wasm32")]
3146            let elapsed_ms = None;
3147            self.record_query_metrics("gremlin", elapsed_ms, &result);
3148        }
3149
3150        result
3151    }
3152
3153    /// Executes a GraphQL query against the LPG store.
3154    ///
3155    /// # Errors
3156    ///
3157    /// Returns an error if the query fails to parse or execute.
3158    ///
3159    /// # Examples
3160    ///
3161    /// ```no_run
3162    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3163    /// use grafeo_engine::GrafeoDB;
3164    ///
3165    /// let db = GrafeoDB::new_in_memory();
3166    /// let session = db.session();
3167    ///
3168    /// // Create some nodes first
3169    /// session.create_node(&["User"]);
3170    ///
3171    /// // Query using GraphQL
3172    /// let result = session.execute_graphql("query { user { id name } }")?;
3173    /// # Ok(())
3174    /// # }
3175    /// ```
3176    #[cfg(feature = "graphql")]
3177    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
3178        use crate::query::{
3179            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3180            translators::graphql,
3181        };
3182
3183        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3184        let start_time = Instant::now();
3185
3186        let mut logical_plan = graphql::translate(query)?;
3187
3188        // Substitute default parameter values from variable declarations
3189        if !logical_plan.default_params.is_empty() {
3190            let defaults = logical_plan.default_params.clone();
3191            substitute_params(&mut logical_plan, &defaults)?;
3192        }
3193
3194        let mut binder = Binder::new();
3195        let _binding_context = binder.bind(&logical_plan)?;
3196
3197        let active = self.active_store();
3198        let optimizer = Optimizer::from_graph_store(&*active);
3199        let optimized_plan = optimizer.optimize(logical_plan)?;
3200        let has_mutations = optimized_plan.root.has_mutations();
3201        if has_mutations {
3202            self.require_permission(crate::auth::StatementKind::Write)?;
3203        }
3204
3205        let result = self.with_auto_commit(has_mutations, || {
3206            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3207            let planner =
3208                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3209            let mut physical_plan = planner.plan(&optimized_plan)?;
3210            let executor = self.make_executor(physical_plan.columns.clone());
3211            executor.execute(physical_plan.operator.as_mut())
3212        });
3213
3214        #[cfg(feature = "metrics")]
3215        {
3216            #[cfg(not(target_arch = "wasm32"))]
3217            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3218            #[cfg(target_arch = "wasm32")]
3219            let elapsed_ms = None;
3220            self.record_query_metrics("graphql", elapsed_ms, &result);
3221        }
3222
3223        result
3224    }
3225
3226    /// Executes a GraphQL query with parameters.
3227    ///
3228    /// # Errors
3229    ///
3230    /// Returns an error if the query fails to parse or execute.
3231    #[cfg(feature = "graphql")]
3232    pub fn execute_graphql_with_params(
3233        &self,
3234        query: &str,
3235        params: std::collections::HashMap<String, Value>,
3236    ) -> Result<QueryResult> {
3237        use crate::query::{
3238            binder::Binder, optimizer::Optimizer, processor::substitute_params,
3239            translators::graphql,
3240        };
3241
3242        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3243        let start_time = Instant::now();
3244
3245        // Parse and translate the query to a logical plan
3246        let mut logical_plan = graphql::translate(query)?;
3247
3248        // Merge default params with caller-supplied params
3249        if !logical_plan.default_params.is_empty() {
3250            let mut merged = logical_plan.default_params.clone();
3251            merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
3252            substitute_params(&mut logical_plan, &merged)?;
3253        } else {
3254            substitute_params(&mut logical_plan, &params)?;
3255        }
3256
3257        // Semantic validation
3258        let mut binder = Binder::new();
3259        let _binding_context = binder.bind(&logical_plan)?;
3260
3261        // Optimize the plan
3262        let active = self.active_store();
3263        let optimizer = Optimizer::from_graph_store(&*active);
3264        let optimized_plan = optimizer.optimize(logical_plan)?;
3265
3266        let has_mutations = optimized_plan.root.has_mutations();
3267        if has_mutations {
3268            self.require_permission(crate::auth::StatementKind::Write)?;
3269        }
3270
3271        let result = self.with_auto_commit(has_mutations, || {
3272            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3273            let planner =
3274                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3275            let mut physical_plan = planner.plan(&optimized_plan)?;
3276            let executor = self.make_executor(physical_plan.columns.clone());
3277            executor.execute(physical_plan.operator.as_mut())
3278        });
3279
3280        #[cfg(feature = "metrics")]
3281        {
3282            #[cfg(not(target_arch = "wasm32"))]
3283            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3284            #[cfg(target_arch = "wasm32")]
3285            let elapsed_ms = None;
3286            self.record_query_metrics("graphql", elapsed_ms, &result);
3287        }
3288
3289        result
3290    }
3291
3292    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
3293    ///
3294    /// # Errors
3295    ///
3296    /// Returns an error if the query fails to parse or execute.
3297    ///
3298    /// # Examples
3299    ///
3300    /// ```no_run
3301    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3302    /// use grafeo_engine::GrafeoDB;
3303    ///
3304    /// let db = GrafeoDB::new_in_memory();
3305    /// let session = db.session();
3306    ///
3307    /// let result = session.execute_sql(
3308    ///     "SELECT * FROM GRAPH_TABLE (
3309    ///         MATCH (n:Person)
3310    ///         COLUMNS (n.name AS name)
3311    ///     )"
3312    /// )?;
3313    /// # Ok(())
3314    /// # }
3315    /// ```
3316    #[cfg(feature = "sql-pgq")]
3317    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
3318        use crate::query::{
3319            binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
3320            processor::QueryLanguage, translators::sql_pgq,
3321        };
3322
3323        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3324        let start_time = Instant::now();
3325
3326        // Parse and translate (always needed to check for DDL)
3327        let logical_plan = sql_pgq::translate(query)?;
3328
3329        // Handle DDL statements directly (they don't go through the query pipeline)
3330        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
3331            self.require_permission(crate::auth::StatementKind::Admin)?;
3332            return Ok(QueryResult {
3333                columns: vec!["status".into()],
3334                column_types: vec![grafeo_common::types::LogicalType::String],
3335                rows: vec![vec![Value::from(format!(
3336                    "Property graph '{}' created ({} node tables, {} edge tables)",
3337                    cpg.name,
3338                    cpg.node_tables.len(),
3339                    cpg.edge_tables.len()
3340                ))]],
3341                execution_time_ms: None,
3342                rows_scanned: None,
3343                status_message: None,
3344                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3345            });
3346        }
3347
3348        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3349
3350        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3351            cached_plan
3352        } else {
3353            let mut binder = Binder::new();
3354            let _binding_context = binder.bind(&logical_plan)?;
3355            let active = self.active_store();
3356            let optimizer = Optimizer::from_graph_store(&*active);
3357            let plan = optimizer.optimize(logical_plan)?;
3358            self.query_cache.put_optimized(cache_key, plan.clone());
3359            plan
3360        };
3361
3362        let active = self.active_store();
3363        let has_mutations = optimized_plan.root.has_mutations();
3364        if has_mutations {
3365            self.require_permission(crate::auth::StatementKind::Write)?;
3366        }
3367
3368        let result = self.with_auto_commit(has_mutations, || {
3369            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3370            let planner =
3371                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3372            let mut physical_plan = planner.plan(&optimized_plan)?;
3373            let executor = self.make_executor(physical_plan.columns.clone());
3374            executor.execute(physical_plan.operator.as_mut())
3375        });
3376
3377        #[cfg(feature = "metrics")]
3378        {
3379            #[cfg(not(target_arch = "wasm32"))]
3380            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3381            #[cfg(target_arch = "wasm32")]
3382            let elapsed_ms = None;
3383            self.record_query_metrics("sql", elapsed_ms, &result);
3384        }
3385
3386        result
3387    }
3388
3389    /// Executes a SQL/PGQ query with parameters.
3390    ///
3391    /// # Errors
3392    ///
3393    /// Returns an error if the query fails to parse or execute.
3394    #[cfg(feature = "sql-pgq")]
3395    pub fn execute_sql_with_params(
3396        &self,
3397        query: &str,
3398        params: std::collections::HashMap<String, Value>,
3399    ) -> Result<QueryResult> {
3400        use crate::query::processor::{QueryLanguage, QueryProcessor};
3401
3402        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3403        let start_time = Instant::now();
3404
3405        let has_mutations = if self.identity.can_write() {
3406            Self::query_looks_like_mutation(query)
3407        } else {
3408            use crate::query::translators::sql_pgq;
3409            match sql_pgq::translate(query) {
3410                Ok(plan) if plan.root.has_mutations() => {
3411                    self.require_permission(crate::auth::StatementKind::Write)?;
3412                    true
3413                }
3414                Ok(_) => false,
3415                Err(_) => Self::query_looks_like_mutation(query),
3416            }
3417        };
3418        if has_mutations {
3419            self.require_permission(crate::auth::StatementKind::Write)?;
3420        }
3421        let active = self.active_store();
3422
3423        let result = self.with_auto_commit(has_mutations, || {
3424            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3425            let processor = QueryProcessor::for_stores_with_transaction(
3426                Arc::clone(&active),
3427                self.active_write_store(),
3428                Arc::clone(&self.transaction_manager),
3429            )?;
3430            let processor = if let Some(transaction_id) = transaction_id {
3431                processor.with_transaction_context(viewing_epoch, transaction_id)
3432            } else {
3433                processor
3434            };
3435            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3436        });
3437
3438        #[cfg(feature = "metrics")]
3439        {
3440            #[cfg(not(target_arch = "wasm32"))]
3441            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3442            #[cfg(target_arch = "wasm32")]
3443            let elapsed_ms = None;
3444            self.record_query_metrics("sql", elapsed_ms, &result);
3445        }
3446
3447        result
3448    }
3449
3450    /// Executes a query in the specified language by name.
3451    ///
3452    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3453    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3454    ///
3455    /// # Errors
3456    ///
3457    /// Returns an error if the language is unknown/disabled or the query fails.
3458    pub fn execute_language(
3459        &self,
3460        query: &str,
3461        language: &str,
3462        params: Option<std::collections::HashMap<String, Value>>,
3463    ) -> Result<QueryResult> {
3464        let _span = grafeo_info_span!(
3465            "grafeo::session::execute",
3466            language,
3467            query_len = query.len(),
3468        );
3469        match language {
3470            "gql" => {
3471                if let Some(p) = params {
3472                    self.execute_with_params(query, p)
3473                } else {
3474                    self.execute(query)
3475                }
3476            }
3477            #[cfg(feature = "cypher")]
3478            "cypher" => {
3479                if let Some(p) = params {
3480                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3481
3482                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3483                    let start_time = Instant::now();
3484
3485                    let has_mutations = if self.identity.can_write() {
3486                        Self::query_looks_like_mutation(query)
3487                    } else {
3488                        use crate::query::translators::cypher;
3489                        match cypher::translate(query) {
3490                            Ok(plan) if plan.root.has_mutations() => {
3491                                self.require_permission(crate::auth::StatementKind::Write)?;
3492                                true
3493                            }
3494                            Ok(_) => false,
3495                            Err(_) => Self::query_looks_like_mutation(query),
3496                        }
3497                    };
3498                    let active = self.active_store();
3499                    let result = self.with_auto_commit(has_mutations, || {
3500                        let processor = QueryProcessor::for_stores_with_transaction(
3501                            Arc::clone(&active),
3502                            self.active_write_store(),
3503                            Arc::clone(&self.transaction_manager),
3504                        )?;
3505                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3506                        let processor = if let Some(transaction_id) = transaction_id {
3507                            processor.with_transaction_context(viewing_epoch, transaction_id)
3508                        } else {
3509                            processor
3510                        };
3511                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3512                    });
3513
3514                    #[cfg(feature = "metrics")]
3515                    {
3516                        #[cfg(not(target_arch = "wasm32"))]
3517                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3518                        #[cfg(target_arch = "wasm32")]
3519                        let elapsed_ms = None;
3520                        self.record_query_metrics("cypher", elapsed_ms, &result);
3521                    }
3522
3523                    result
3524                } else {
3525                    self.execute_cypher(query)
3526                }
3527            }
3528            #[cfg(feature = "gremlin")]
3529            "gremlin" => {
3530                if let Some(p) = params {
3531                    self.execute_gremlin_with_params(query, p)
3532                } else {
3533                    self.execute_gremlin(query)
3534                }
3535            }
3536            #[cfg(feature = "graphql")]
3537            "graphql" => {
3538                if let Some(p) = params {
3539                    self.execute_graphql_with_params(query, p)
3540                } else {
3541                    self.execute_graphql(query)
3542                }
3543            }
3544            #[cfg(all(feature = "graphql", feature = "triple-store"))]
3545            "graphql-rdf" => {
3546                if let Some(p) = params {
3547                    self.execute_graphql_rdf_with_params(query, p)
3548                } else {
3549                    self.execute_graphql_rdf(query)
3550                }
3551            }
3552            #[cfg(feature = "sql-pgq")]
3553            "sql" | "sql-pgq" => {
3554                if let Some(p) = params {
3555                    self.execute_sql_with_params(query, p)
3556                } else {
3557                    self.execute_sql(query)
3558                }
3559            }
3560            #[cfg(all(feature = "sparql", feature = "triple-store"))]
3561            "sparql" => {
3562                if let Some(p) = params {
3563                    self.execute_sparql_with_params(query, p)
3564                } else {
3565                    self.execute_sparql(query)
3566                }
3567            }
3568            other => Err(grafeo_common::utils::error::Error::Query(
3569                grafeo_common::utils::error::QueryError::new(
3570                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3571                    format!("Unknown query language: '{other}'"),
3572                ),
3573            )),
3574        }
3575    }
3576
3577    /// Begins a new transaction.
3578    ///
3579    /// # Errors
3580    ///
3581    /// Returns an error if a transaction is already active.
3582    ///
3583    /// # Examples
3584    ///
3585    /// ```no_run
3586    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3587    /// use grafeo_engine::GrafeoDB;
3588    ///
3589    /// let db = GrafeoDB::new_in_memory();
3590    /// let mut session = db.session();
3591    ///
3592    /// session.begin_transaction()?;
3593    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3594    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3595    /// session.commit()?; // Both inserts committed atomically
3596    /// # Ok(())
3597    /// # }
3598    /// ```
3599    /// Clears all cached query plans.
3600    ///
3601    /// The plan cache is shared across all sessions on the same database,
3602    /// so clearing from one session affects all sessions.
3603    pub fn clear_plan_cache(&self) {
3604        self.query_cache.clear();
3605    }
3606
3607    /// Begins a new transaction on this session.
3608    ///
3609    /// Uses the default isolation level (`SnapshotIsolation`).
3610    ///
3611    /// # Errors
3612    ///
3613    /// Returns an error if a transaction is already active.
3614    #[cfg(feature = "lpg")]
3615    pub fn begin_transaction(&mut self) -> Result<()> {
3616        self.begin_transaction_inner(false, None)
3617    }
3618
3619    /// Begins a transaction with a specific isolation level.
3620    ///
3621    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3622    ///
3623    /// # Errors
3624    ///
3625    /// Returns an error if a transaction is already active.
3626    #[cfg(feature = "lpg")]
3627    pub fn begin_transaction_with_isolation(
3628        &mut self,
3629        isolation_level: crate::transaction::IsolationLevel,
3630    ) -> Result<()> {
3631        self.begin_transaction_inner(false, Some(isolation_level))
3632    }
3633
3634    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3635    #[cfg(feature = "lpg")]
3636    fn begin_transaction_inner(
3637        &self,
3638        read_only: bool,
3639        isolation_level: Option<crate::transaction::IsolationLevel>,
3640    ) -> Result<()> {
3641        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3642        let mut current = self.current_transaction.lock();
3643        if current.is_some() {
3644            // Nested transaction: create an auto-savepoint instead of a new tx.
3645            drop(current);
3646            let mut depth = self.transaction_nesting_depth.lock();
3647            *depth += 1;
3648            let sp_name = format!("_nested_tx_{}", *depth);
3649            self.savepoint(&sp_name)?;
3650            return Ok(());
3651        }
3652
3653        let active = self.active_lpg_store();
3654        self.transaction_start_node_count
3655            .store(active.node_count(), Ordering::Relaxed);
3656        self.transaction_start_edge_count
3657            .store(active.edge_count(), Ordering::Relaxed);
3658        let transaction_id = if let Some(level) = isolation_level {
3659            self.transaction_manager.begin_with_isolation(level)
3660        } else {
3661            self.transaction_manager.begin()
3662        };
3663        *current = Some(transaction_id);
3664        *self.read_only_tx.lock() = read_only || self.db_read_only;
3665
3666        // Record the initial graph as "touched" for cross-graph atomicity.
3667        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3668        let key = self.active_graph_storage_key();
3669        let mut touched = self.touched_graphs.lock();
3670        touched.clear();
3671        touched.push(key);
3672
3673        #[cfg(feature = "metrics")]
3674        {
3675            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3676            #[cfg(not(target_arch = "wasm32"))]
3677            {
3678                *self.tx_start_time.lock() = Some(Instant::now());
3679            }
3680        }
3681
3682        Ok(())
3683    }
3684
3685    /// Commits the current transaction.
3686    ///
3687    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3688    ///
3689    /// # Errors
3690    ///
3691    /// Returns an error if no transaction is active.
3692    #[cfg(feature = "lpg")]
3693    pub fn commit(&mut self) -> Result<()> {
3694        self.commit_inner()
3695    }
3696
3697    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3698    #[cfg(feature = "lpg")]
3699    fn commit_inner(&self) -> Result<()> {
3700        let _span = grafeo_debug_span!("grafeo::tx::commit");
3701        // Nested transaction: release the auto-savepoint (changes are preserved).
3702        {
3703            let mut depth = self.transaction_nesting_depth.lock();
3704            if *depth > 0 {
3705                let sp_name = format!("_nested_tx_{depth}");
3706                *depth -= 1;
3707                drop(depth);
3708                return self.release_savepoint(&sp_name);
3709            }
3710        }
3711
3712        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3713            grafeo_common::utils::error::Error::Transaction(
3714                grafeo_common::utils::error::TransactionError::InvalidState(
3715                    "No active transaction".to_string(),
3716                ),
3717            )
3718        })?;
3719
3720        // Validate the transaction first (conflict detection) before committing data.
3721        // If this fails, we rollback the data changes instead of making them permanent.
3722        //
3723        // Take ownership of the touched graphs in one lock acquisition. Since
3724        // current_transaction was .take()'d above, no concurrent thread can call
3725        // track_graph_touch() for this transaction (it checks current_transaction
3726        // first), so this is safe.
3727        let touched = std::mem::take(&mut *self.touched_graphs.lock());
3728        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3729            Ok(epoch) => epoch,
3730            Err(e) => {
3731                // Conflict detected: rollback the data changes
3732                for graph_name in &touched {
3733                    let store = self.resolve_store(graph_name);
3734                    store.rollback_transaction_properties(transaction_id);
3735                }
3736                #[cfg(feature = "triple-store")]
3737                self.rollback_rdf_transaction(transaction_id);
3738                // Discard buffered CDC events on conflict rollback
3739                #[cfg(feature = "cdc")]
3740                if let Some(ref pending) = self.cdc_pending_events {
3741                    pending.lock().clear();
3742                }
3743                *self.read_only_tx.lock() = self.db_read_only;
3744                self.savepoints.lock().clear();
3745                self.touched_graphs.lock().clear();
3746                #[cfg(feature = "metrics")]
3747                {
3748                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3749                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3750                    #[cfg(not(target_arch = "wasm32"))]
3751                    if let Some(start) = self.tx_start_time.lock().take() {
3752                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3753                        crate::metrics::record_metric!(
3754                            self.metrics,
3755                            tx_duration,
3756                            observe duration_ms
3757                        );
3758                    }
3759                }
3760                return Err(e);
3761            }
3762        };
3763
3764        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3765        for graph_name in &touched {
3766            let store = self.resolve_store(graph_name);
3767            store.finalize_version_epochs(transaction_id, commit_epoch);
3768        }
3769
3770        // Commit succeeded: discard undo logs (make changes permanent)
3771        #[cfg(feature = "triple-store")]
3772        self.commit_rdf_transaction(transaction_id);
3773
3774        for graph_name in &touched {
3775            let store = self.resolve_store(graph_name);
3776            store.commit_transaction_properties(transaction_id);
3777        }
3778
3779        // Flush buffered CDC events now that the transaction is committed.
3780        // All buffered events have PENDING epoch; assign the real commit_epoch.
3781        // Uses record_batch to acquire the write lock once per commit.
3782        #[cfg(feature = "cdc")]
3783        if let Some(ref pending) = self.cdc_pending_events {
3784            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3785            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3786                e.epoch = commit_epoch;
3787                e
3788            }));
3789        }
3790
3791        // Log transaction commit and epoch advance to WAL so that crash
3792        // recovery can identify committed transactions and their epoch
3793        // boundaries. Without these markers, WAL recovery discards all
3794        // records as uncommitted (fixes #252 for the crash scenario).
3795        #[cfg(feature = "wal")]
3796        if let Some(ref wal) = self.wal {
3797            use grafeo_storage::wal::WalRecord;
3798            if let Err(e) = wal.log(&WalRecord::TransactionCommit { transaction_id }) {
3799                grafeo_warn!("Failed to log transaction commit to WAL: {}", e);
3800            }
3801            if let Err(e) = wal.log(&WalRecord::EpochAdvance {
3802                epoch: commit_epoch,
3803            }) {
3804                grafeo_warn!("Failed to log epoch advance to WAL: {}", e);
3805            }
3806        }
3807
3808        // Sync epoch for all touched graphs so that convenience lookups
3809        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3810        let current_epoch = self.transaction_manager.current_epoch();
3811        for graph_name in &touched {
3812            let store = self.resolve_store(graph_name);
3813            store.sync_epoch(current_epoch);
3814        }
3815
3816        // Reset read-only flag and clear savepoints.
3817        // touched_graphs was already emptied by mem::take above.
3818        *self.read_only_tx.lock() = self.db_read_only;
3819        self.savepoints.lock().clear();
3820
3821        // Auto-GC: periodically prune old MVCC versions
3822        if self.gc_interval > 0 {
3823            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3824            if count.is_multiple_of(self.gc_interval) {
3825                #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3826                let gc_start = std::time::Instant::now();
3827
3828                let min_epoch = self.transaction_manager.min_active_epoch();
3829                for graph_name in &touched {
3830                    let store = self.resolve_store(graph_name);
3831                    store.gc_versions(min_epoch);
3832                }
3833                self.transaction_manager.gc();
3834
3835                #[cfg(feature = "metrics")]
3836                {
3837                    crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3838                    #[cfg(not(target_arch = "wasm32"))]
3839                    {
3840                        let gc_duration_ms = gc_start.elapsed().as_secs_f64() * 1000.0;
3841                        crate::metrics::record_metric!(
3842                            self.metrics,
3843                            gc_duration,
3844                            observe gc_duration_ms
3845                        );
3846                    }
3847                }
3848            }
3849        }
3850
3851        #[cfg(feature = "metrics")]
3852        {
3853            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3854            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3855            #[cfg(not(target_arch = "wasm32"))]
3856            if let Some(start) = self.tx_start_time.lock().take() {
3857                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3858                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3859            }
3860        }
3861
3862        Ok(())
3863    }
3864
3865    /// Aborts the current transaction.
3866    ///
3867    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3868    ///
3869    /// # Errors
3870    ///
3871    /// Returns an error if no transaction is active.
3872    ///
3873    /// # Examples
3874    ///
3875    /// ```no_run
3876    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3877    /// use grafeo_engine::GrafeoDB;
3878    ///
3879    /// let db = GrafeoDB::new_in_memory();
3880    /// let mut session = db.session();
3881    ///
3882    /// session.begin_transaction()?;
3883    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3884    /// session.rollback()?; // Insert is discarded
3885    /// # Ok(())
3886    /// # }
3887    /// ```
3888    #[cfg(feature = "lpg")]
3889    pub fn rollback(&mut self) -> Result<()> {
3890        self.rollback_inner()
3891    }
3892
3893    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3894    #[cfg(feature = "lpg")]
3895    fn rollback_inner(&self) -> Result<()> {
3896        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3897        // Nested transaction: rollback to the auto-savepoint.
3898        {
3899            let mut depth = self.transaction_nesting_depth.lock();
3900            if *depth > 0 {
3901                let sp_name = format!("_nested_tx_{depth}");
3902                *depth -= 1;
3903                drop(depth);
3904                return self.rollback_to_savepoint(&sp_name);
3905            }
3906        }
3907
3908        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3909            grafeo_common::utils::error::Error::Transaction(
3910                grafeo_common::utils::error::TransactionError::InvalidState(
3911                    "No active transaction".to_string(),
3912                ),
3913            )
3914        })?;
3915
3916        // Reset read-only flag
3917        *self.read_only_tx.lock() = self.db_read_only;
3918
3919        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3920        let touched = self.touched_graphs.lock().clone();
3921        for graph_name in &touched {
3922            let store = self.resolve_store(graph_name);
3923            store.discard_uncommitted_versions(transaction_id);
3924        }
3925
3926        // Discard pending operations in the RDF store
3927        #[cfg(feature = "triple-store")]
3928        self.rollback_rdf_transaction(transaction_id);
3929
3930        // Discard buffered CDC events on rollback
3931        #[cfg(feature = "cdc")]
3932        if let Some(ref pending) = self.cdc_pending_events {
3933            pending.lock().clear();
3934        }
3935
3936        // Clear savepoints and touched graphs
3937        self.savepoints.lock().clear();
3938        self.touched_graphs.lock().clear();
3939
3940        // Mark transaction as aborted in the manager
3941        let result = self.transaction_manager.abort(transaction_id);
3942
3943        #[cfg(feature = "metrics")]
3944        if result.is_ok() {
3945            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3946            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3947            #[cfg(not(target_arch = "wasm32"))]
3948            if let Some(start) = self.tx_start_time.lock().take() {
3949                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3950                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3951            }
3952        }
3953
3954        result
3955    }
3956
3957    /// Creates a named savepoint within the current transaction.
3958    ///
3959    /// The savepoint captures the current node/edge ID counters so that
3960    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3961    /// entities created after this point.
3962    ///
3963    /// # Errors
3964    ///
3965    /// Returns an error if no transaction is active.
3966    #[cfg(feature = "lpg")]
3967    pub fn savepoint(&self, name: &str) -> Result<()> {
3968        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3969            grafeo_common::utils::error::Error::Transaction(
3970                grafeo_common::utils::error::TransactionError::InvalidState(
3971                    "No active transaction".to_string(),
3972                ),
3973            )
3974        })?;
3975
3976        // Capture state for every graph touched so far.
3977        let touched = self.touched_graphs.lock().clone();
3978        let graph_snapshots: Vec<GraphSavepoint> = touched
3979            .iter()
3980            .map(|graph_name| {
3981                let store = self.resolve_store(graph_name);
3982                GraphSavepoint {
3983                    graph_name: graph_name.clone(),
3984                    next_node_id: store.peek_next_node_id(),
3985                    next_edge_id: store.peek_next_edge_id(),
3986                    undo_log_position: store.property_undo_log_position(tx_id),
3987                }
3988            })
3989            .collect();
3990
3991        self.savepoints.lock().push(SavepointState {
3992            name: name.to_string(),
3993            graph_snapshots,
3994            active_graph: self.current_graph.lock().clone(),
3995            #[cfg(feature = "cdc")]
3996            cdc_event_position: self
3997                .cdc_pending_events
3998                .as_ref()
3999                .map_or(0, |p| p.lock().len()),
4000        });
4001        Ok(())
4002    }
4003
4004    /// Rolls back to a named savepoint, undoing all writes made after it.
4005    ///
4006    /// The savepoint and any savepoints created after it are removed.
4007    /// Entities with IDs >= the savepoint snapshot are discarded.
4008    ///
4009    /// # Errors
4010    ///
4011    /// Returns an error if no transaction is active or the savepoint does not exist.
4012    #[cfg(feature = "lpg")]
4013    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4014        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
4015            grafeo_common::utils::error::Error::Transaction(
4016                grafeo_common::utils::error::TransactionError::InvalidState(
4017                    "No active transaction".to_string(),
4018                ),
4019            )
4020        })?;
4021
4022        let mut savepoints = self.savepoints.lock();
4023
4024        // Find the savepoint by name (search from the end for nested savepoints)
4025        let pos = savepoints
4026            .iter()
4027            .rposition(|sp| sp.name == name)
4028            .ok_or_else(|| {
4029                grafeo_common::utils::error::Error::Transaction(
4030                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4031                        "Savepoint '{name}' not found"
4032                    )),
4033                )
4034            })?;
4035
4036        let sp_state = savepoints[pos].clone();
4037
4038        // Remove this savepoint and all later ones
4039        savepoints.truncate(pos);
4040        drop(savepoints);
4041
4042        // Roll back each graph that was captured in the savepoint.
4043        for gs in &sp_state.graph_snapshots {
4044            let store = self.resolve_store(&gs.graph_name);
4045
4046            // Replay property/label undo entries recorded after the savepoint
4047            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
4048
4049            // Discard entities created after the savepoint
4050            let current_next_node = store.peek_next_node_id();
4051            let current_next_edge = store.peek_next_edge_id();
4052
4053            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
4054                .map(NodeId::new)
4055                .collect();
4056            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
4057                .map(EdgeId::new)
4058                .collect();
4059
4060            if !node_ids.is_empty() || !edge_ids.is_empty() {
4061                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
4062            }
4063        }
4064
4065        // Also roll back any graphs that were touched AFTER the savepoint
4066        // but not captured in it. These need full discard since the savepoint
4067        // didn't include them.
4068        let touched = self.touched_graphs.lock().clone();
4069        for graph_name in &touched {
4070            let already_captured = sp_state
4071                .graph_snapshots
4072                .iter()
4073                .any(|gs| gs.graph_name == *graph_name);
4074            if !already_captured {
4075                let store = self.resolve_store(graph_name);
4076                store.discard_uncommitted_versions(transaction_id);
4077            }
4078        }
4079
4080        // Truncate CDC event buffer to the savepoint position.
4081        #[cfg(feature = "cdc")]
4082        if let Some(ref pending) = self.cdc_pending_events {
4083            pending.lock().truncate(sp_state.cdc_event_position);
4084        }
4085
4086        // Restore touched_graphs to only the graphs that were known at savepoint time.
4087        let mut touched = self.touched_graphs.lock();
4088        touched.clear();
4089        for gs in &sp_state.graph_snapshots {
4090            if !touched.contains(&gs.graph_name) {
4091                touched.push(gs.graph_name.clone());
4092            }
4093        }
4094
4095        Ok(())
4096    }
4097
4098    /// Releases (removes) a named savepoint without rolling back.
4099    ///
4100    /// # Errors
4101    ///
4102    /// Returns an error if no transaction is active or the savepoint does not exist.
4103    pub fn release_savepoint(&self, name: &str) -> Result<()> {
4104        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
4105            grafeo_common::utils::error::Error::Transaction(
4106                grafeo_common::utils::error::TransactionError::InvalidState(
4107                    "No active transaction".to_string(),
4108                ),
4109            )
4110        })?;
4111
4112        let mut savepoints = self.savepoints.lock();
4113        let pos = savepoints
4114            .iter()
4115            .rposition(|sp| sp.name == name)
4116            .ok_or_else(|| {
4117                grafeo_common::utils::error::Error::Transaction(
4118                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
4119                        "Savepoint '{name}' not found"
4120                    )),
4121                )
4122            })?;
4123        savepoints.remove(pos);
4124        Ok(())
4125    }
4126
4127    /// Returns whether a transaction is active.
4128    #[must_use]
4129    pub fn in_transaction(&self) -> bool {
4130        self.current_transaction.lock().is_some()
4131    }
4132
4133    /// Returns the current transaction ID, if any.
4134    #[must_use]
4135    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
4136        *self.current_transaction.lock()
4137    }
4138
4139    /// Returns a reference to the transaction manager.
4140    #[must_use]
4141    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
4142        &self.transaction_manager
4143    }
4144
4145    /// Returns the store's current node count and the count at transaction start.
4146    #[cfg(feature = "lpg")]
4147    #[must_use]
4148    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
4149        (
4150            self.transaction_start_node_count.load(Ordering::Relaxed),
4151            self.active_lpg_store().node_count(),
4152        )
4153    }
4154
4155    /// Returns the store's current edge count and the count at transaction start.
4156    #[cfg(feature = "lpg")]
4157    #[must_use]
4158    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
4159        (
4160            self.transaction_start_edge_count.load(Ordering::Relaxed),
4161            self.active_lpg_store().edge_count(),
4162        )
4163    }
4164
4165    /// Prepares the current transaction for a two-phase commit.
4166    ///
4167    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
4168    /// lets you inspect pending changes and attach metadata before finalizing.
4169    /// The mutable borrow prevents concurrent operations while the commit is
4170    /// pending.
4171    ///
4172    /// If the `PreparedCommit` is dropped without calling `commit()` or
4173    /// `abort()`, the transaction is automatically rolled back.
4174    ///
4175    /// # Errors
4176    ///
4177    /// Returns an error if no transaction is active.
4178    ///
4179    /// # Examples
4180    ///
4181    /// ```no_run
4182    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
4183    /// use grafeo_engine::GrafeoDB;
4184    ///
4185    /// let db = GrafeoDB::new_in_memory();
4186    /// let mut session = db.session();
4187    ///
4188    /// session.begin_transaction()?;
4189    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
4190    ///
4191    /// let mut prepared = session.prepare_commit()?;
4192    /// println!("Nodes written: {}", prepared.info().nodes_written);
4193    /// prepared.set_metadata("audit_user", "admin");
4194    /// prepared.commit()?;
4195    /// # Ok(())
4196    /// # }
4197    /// ```
4198    #[cfg(feature = "lpg")]
4199    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
4200        crate::transaction::PreparedCommit::new(self)
4201    }
4202
4203    /// Sets auto-commit mode.
4204    pub fn set_auto_commit(&mut self, auto_commit: bool) {
4205        self.auto_commit = auto_commit;
4206    }
4207
4208    /// Returns whether auto-commit is enabled.
4209    #[must_use]
4210    pub fn auto_commit(&self) -> bool {
4211        self.auto_commit
4212    }
4213
4214    /// Returns `true` if auto-commit should wrap this execution.
4215    ///
4216    /// Auto-commit kicks in when: the session is in auto-commit mode,
4217    /// no explicit transaction is active, and the query mutates data.
4218    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
4219        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
4220    }
4221
4222    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
4223    /// returns `true`. On error the transaction is rolled back.
4224    #[cfg(feature = "lpg")]
4225    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
4226    where
4227        F: FnOnce() -> Result<QueryResult>,
4228    {
4229        if self.needs_auto_commit(has_mutations) {
4230            self.begin_transaction_inner(false, None)?;
4231            match body() {
4232                Ok(result) => {
4233                    self.commit_inner()?;
4234                    Ok(result)
4235                }
4236                Err(e) => {
4237                    let _ = self.rollback_inner();
4238                    Err(e)
4239                }
4240            }
4241        } else {
4242            body()
4243        }
4244    }
4245
4246    /// Non-LPG stub: no auto-commit wrapping (SPARQL UPDATE is atomic).
4247    #[cfg(not(feature = "lpg"))]
4248    fn with_auto_commit<F>(&self, _has_mutations: bool, body: F) -> Result<QueryResult>
4249    where
4250        F: FnOnce() -> Result<QueryResult>,
4251    {
4252        body()
4253    }
4254
4255    /// Quick heuristic: returns `true` when the query text looks like it
4256    /// performs a mutation. Used by `_with_params` paths that go through the
4257    /// `QueryProcessor` (where the logical plan isn't available before
4258    /// execution). False negatives are harmless: the data just won't be
4259    /// auto-committed, which matches the prior behaviour.
4260    fn query_looks_like_mutation(query: &str) -> bool {
4261        let upper = query.to_ascii_uppercase();
4262        upper.contains("INSERT")
4263            || upper.contains("CREATE")
4264            || upper.contains("DELETE")
4265            || upper.contains("MERGE")
4266            || upper.contains("SET")
4267            || upper.contains("REMOVE")
4268            || upper.contains("DROP")
4269            || upper.contains("ALTER")
4270    }
4271
4272    /// Computes the wall-clock deadline for query execution.
4273    #[must_use]
4274    fn query_deadline(&self) -> Option<Instant> {
4275        #[cfg(not(target_arch = "wasm32"))]
4276        {
4277            self.query_timeout.map(|d| Instant::now() + d)
4278        }
4279        #[cfg(target_arch = "wasm32")]
4280        {
4281            let _ = &self.query_timeout;
4282            None
4283        }
4284    }
4285
4286    /// Creates an executor with deadline and timeout duration configured.
4287    fn make_executor(&self, columns: Vec<String>) -> Executor {
4288        Executor::with_columns(columns)
4289            .with_deadline(self.query_deadline())
4290            .with_timeout_duration(self.query_timeout)
4291    }
4292
4293    /// Creates a per-query `OperatorMemoryContext` for memory-aware spilling.
4294    ///
4295    /// Returns `None` if the buffer manager is not configured or has no spill path,
4296    /// in which case operators will use row-count fallback thresholds.
4297    #[cfg(feature = "spill")]
4298    fn make_operator_memory_context(
4299        &self,
4300    ) -> Option<grafeo_core::execution::OperatorMemoryContext> {
4301        let bm = self.buffer_manager.as_ref()?;
4302        let spill_path = bm.config().spill_path.as_ref()?;
4303        // Per-query isolation: create a unique subdirectory
4304        let query_id = self
4305            .commit_counter
4306            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4307        let query_dir = spill_path.join(format!("query_{query_id}"));
4308        let sm = std::sync::Arc::new(grafeo_core::execution::SpillManager::new(&query_dir).ok()?);
4309        Some(grafeo_core::execution::OperatorMemoryContext::new(
4310            std::sync::Arc::clone(bm),
4311            sm,
4312        ))
4313    }
4314
4315    /// Checks that a property value does not exceed the configured size limit.
4316    fn check_property_size(&self, key: &str, value: &Value) -> Result<()> {
4317        if let Some(limit) = self.max_property_size {
4318            let size = value.estimated_size_bytes();
4319            if size > limit {
4320                let limit_display = if limit >= 1024 * 1024 && limit % (1024 * 1024) == 0 {
4321                    format!("{} MiB", limit / (1024 * 1024))
4322                } else if limit >= 1024 && limit % 1024 == 0 {
4323                    format!("{} KiB", limit / 1024)
4324                } else {
4325                    format!("{limit} bytes")
4326                };
4327                return Err(grafeo_common::utils::error::Error::Query(
4328                    grafeo_common::utils::error::QueryError::new(
4329                        grafeo_common::utils::error::QueryErrorKind::Execution,
4330                        format!(
4331                            "Property '{key}' value exceeds maximum size of {limit_display} ({size} bytes)"
4332                        ),
4333                    )
4334                    .with_hint(
4335                        "Increase with Config::with_max_property_size() or disable with Config::without_max_property_size()".to_string(),
4336                    ),
4337                ));
4338            }
4339        }
4340        Ok(())
4341    }
4342
4343    /// Records query metrics for any language.
4344    ///
4345    /// Called after query execution to update counters, latency histogram,
4346    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
4347    /// where `Instant` is unavailable.
4348    #[cfg(feature = "metrics")]
4349    fn record_query_metrics(
4350        &self,
4351        language: &str,
4352        elapsed_ms: Option<f64>,
4353        result: &Result<crate::database::QueryResult>,
4354    ) {
4355        use crate::metrics::record_metric;
4356
4357        record_metric!(self.metrics, query_count, inc);
4358        if let Some(ref reg) = self.metrics {
4359            reg.query_count_by_language.increment(language);
4360        }
4361        if let Some(ms) = elapsed_ms {
4362            record_metric!(self.metrics, query_latency, observe ms);
4363        }
4364        match result {
4365            Ok(r) => {
4366                let returned = r.rows.len() as u64;
4367                record_metric!(self.metrics, rows_returned, add returned);
4368                if let Some(scanned) = r.rows_scanned {
4369                    record_metric!(self.metrics, rows_scanned, add scanned);
4370                }
4371            }
4372            Err(e) => {
4373                record_metric!(self.metrics, query_errors, inc);
4374                // Detect timeout errors
4375                let msg = e.to_string();
4376                if msg.contains("exceeded timeout") {
4377                    record_metric!(self.metrics, query_timeouts, inc);
4378                }
4379            }
4380        }
4381    }
4382
4383    /// Evaluates a simple integer literal from a session parameter expression.
4384    #[cfg(feature = "gql")]
4385    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
4386        use grafeo_adapters::query::gql::ast::{Expression, Literal};
4387        match expr {
4388            Expression::Literal(Literal::Integer(n)) => Some(*n),
4389            _ => None,
4390        }
4391    }
4392
4393    /// Returns the current transaction context for MVCC visibility.
4394    ///
4395    /// Returns `(viewing_epoch, transaction_id)` where:
4396    /// - `viewing_epoch` is the epoch at which to check version visibility
4397    /// - `transaction_id` is the current transaction ID (if in a transaction)
4398    #[must_use]
4399    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
4400        // Time-travel override takes precedence (read-only, no tx context)
4401        if let Some(epoch) = *self.viewing_epoch_override.lock() {
4402            return (epoch, None);
4403        }
4404
4405        if let Some(transaction_id) = *self.current_transaction.lock() {
4406            // In a transaction: use the transaction's start epoch
4407            let epoch = self
4408                .transaction_manager
4409                .start_epoch(transaction_id)
4410                .unwrap_or_else(|| self.transaction_manager.current_epoch());
4411            (epoch, Some(transaction_id))
4412        } else {
4413            // No transaction: use current epoch
4414            (self.transaction_manager.current_epoch(), None)
4415        }
4416    }
4417
4418    /// Creates a planner with transaction context and constraint validator.
4419    ///
4420    /// The `store` parameter is the graph store to plan against (use
4421    /// `self.active_store()` for graph-aware execution).
4422    fn create_planner_for_store(
4423        &self,
4424        store: Arc<dyn GraphStore>,
4425        viewing_epoch: EpochId,
4426        transaction_id: Option<TransactionId>,
4427    ) -> crate::query::Planner {
4428        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
4429    }
4430
4431    fn create_planner_for_store_with_read_only(
4432        &self,
4433        store: Arc<dyn GraphStore>,
4434        viewing_epoch: EpochId,
4435        transaction_id: Option<TransactionId>,
4436        read_only: bool,
4437    ) -> crate::query::Planner {
4438        use crate::query::Planner;
4439        use grafeo_core::execution::operators::{LazyValue, SessionContext};
4440
4441        // Capture store reference for lazy introspection (only computed if info()/schema() called).
4442        let info_store = Arc::clone(&store);
4443        let schema_store = Arc::clone(&store);
4444
4445        let session_context = SessionContext {
4446            current_schema: self.current_schema(),
4447            current_graph: self.current_graph(),
4448            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
4449            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
4450        };
4451
4452        let write_store = self.active_write_store();
4453
4454        let mut planner = Planner::with_context(
4455            Arc::clone(&store),
4456            write_store,
4457            Arc::clone(&self.transaction_manager),
4458            transaction_id,
4459            viewing_epoch,
4460        )
4461        .with_factorized_execution(self.factorized_execution)
4462        .with_catalog(Arc::clone(&self.catalog))
4463        .with_session_context(session_context)
4464        .with_read_only(read_only);
4465
4466        // Attach the constraint validator for schema enforcement and property size limits
4467        let validator = CatalogConstraintValidator::new(Arc::clone(&self.catalog))
4468            .with_store(store)
4469            .with_max_property_size(self.max_property_size);
4470        planner = planner.with_validator(Arc::new(validator));
4471
4472        planner
4473    }
4474
4475    /// Builds a `Value::Map` for the `info()` introspection function.
4476    fn build_info_value(store: &dyn GraphStore) -> Value {
4477        use grafeo_common::types::PropertyKey;
4478        use std::collections::BTreeMap;
4479
4480        let mut map = BTreeMap::new();
4481        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
4482        // reason: node/edge counts will not exceed i64::MAX
4483        #[allow(clippy::cast_possible_wrap)]
4484        let node_count = store.node_count() as i64;
4485        // reason: value is a small counter, well within i64::MAX
4486        #[allow(clippy::cast_possible_wrap)]
4487        let edge_count = store.edge_count() as i64;
4488        map.insert(PropertyKey::from("node_count"), Value::Int64(node_count));
4489        map.insert(PropertyKey::from("edge_count"), Value::Int64(edge_count));
4490        map.insert(
4491            PropertyKey::from("version"),
4492            Value::String(env!("CARGO_PKG_VERSION").into()),
4493        );
4494        Value::Map(map.into())
4495    }
4496
4497    /// Builds a `Value::Map` for the `schema()` introspection function.
4498    fn build_schema_value(store: &dyn GraphStore) -> Value {
4499        use grafeo_common::types::PropertyKey;
4500        use std::collections::BTreeMap;
4501
4502        let labels: Vec<Value> = store
4503            .all_labels()
4504            .into_iter()
4505            .map(|l| Value::String(l.into()))
4506            .collect();
4507        let edge_types: Vec<Value> = store
4508            .all_edge_types()
4509            .into_iter()
4510            .map(|t| Value::String(t.into()))
4511            .collect();
4512        let property_keys: Vec<Value> = store
4513            .all_property_keys()
4514            .into_iter()
4515            .map(|k| Value::String(k.into()))
4516            .collect();
4517
4518        let mut map = BTreeMap::new();
4519        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4520        map.insert(
4521            PropertyKey::from("edge_types"),
4522            Value::List(edge_types.into()),
4523        );
4524        map.insert(
4525            PropertyKey::from("property_keys"),
4526            Value::List(property_keys.into()),
4527        );
4528        Value::Map(map.into())
4529    }
4530
4531    /// Creates a node directly (bypassing query execution).
4532    ///
4533    /// This is a low-level API for testing and direct manipulation.
4534    /// If a transaction is active, the node will be versioned with the transaction ID.
4535    #[cfg(feature = "lpg")]
4536    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4537        let (epoch, transaction_id) = self.get_transaction_context();
4538        self.active_lpg_store().create_node_versioned(
4539            labels,
4540            epoch,
4541            transaction_id.unwrap_or(TransactionId::SYSTEM),
4542        )
4543    }
4544
4545    /// Creates a node with properties.
4546    ///
4547    /// If a transaction is active, the node will be versioned with the transaction ID.
4548    ///
4549    /// # Errors
4550    ///
4551    /// Returns an error if any property value exceeds the configured `max_property_size`.
4552    #[cfg(feature = "lpg")]
4553    pub fn create_node_with_props<'a>(
4554        &self,
4555        labels: &[&str],
4556        properties: impl IntoIterator<Item = (&'a str, Value)>,
4557    ) -> Result<NodeId> {
4558        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4559        for (key, value) in &props {
4560            self.check_property_size(key, value)?;
4561        }
4562        let (epoch, transaction_id) = self.get_transaction_context();
4563        Ok(self.active_lpg_store().create_node_with_props_versioned(
4564            labels,
4565            props,
4566            epoch,
4567            transaction_id.unwrap_or(TransactionId::SYSTEM),
4568        ))
4569    }
4570
4571    /// Creates an edge between two nodes.
4572    ///
4573    /// This is a low-level API for testing and direct manipulation.
4574    /// If a transaction is active, the edge will be versioned with the transaction ID.
4575    #[cfg(feature = "lpg")]
4576    pub fn create_edge(
4577        &self,
4578        src: NodeId,
4579        dst: NodeId,
4580        edge_type: &str,
4581    ) -> grafeo_common::types::EdgeId {
4582        let (epoch, transaction_id) = self.get_transaction_context();
4583        self.active_lpg_store().create_edge_versioned(
4584            src,
4585            dst,
4586            edge_type,
4587            epoch,
4588            transaction_id.unwrap_or(TransactionId::SYSTEM),
4589        )
4590    }
4591
4592    /// Creates an edge with properties within the active transaction context.
4593    ///
4594    /// # Errors
4595    ///
4596    /// Returns an error if any property value exceeds the configured `max_property_size`.
4597    #[cfg(feature = "lpg")]
4598    pub fn create_edge_with_props<'a>(
4599        &self,
4600        src: NodeId,
4601        dst: NodeId,
4602        edge_type: &str,
4603        properties: impl IntoIterator<Item = (&'a str, Value)>,
4604    ) -> Result<grafeo_common::types::EdgeId> {
4605        let props: Vec<(&str, Value)> = properties.into_iter().collect();
4606        for (key, value) in &props {
4607            self.check_property_size(key, value)?;
4608        }
4609        let (epoch, transaction_id) = self.get_transaction_context();
4610        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4611        let store = self.active_lpg_store();
4612        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4613        for (key, value) in props {
4614            store.set_edge_property_versioned(eid, key, value, tid);
4615        }
4616        Ok(eid)
4617    }
4618
4619    /// Sets a node property within the active transaction context.
4620    ///
4621    /// # Errors
4622    ///
4623    /// Returns an error if the value exceeds the configured `max_property_size`.
4624    #[cfg(feature = "lpg")]
4625    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) -> Result<()> {
4626        self.check_property_size(key, &value)?;
4627        let (_, transaction_id) = self.get_transaction_context();
4628        if let Some(tid) = transaction_id {
4629            self.active_lpg_store()
4630                .set_node_property_versioned(id, key, value, tid);
4631        } else {
4632            self.active_lpg_store().set_node_property(id, key, value);
4633        }
4634        Ok(())
4635    }
4636
4637    /// Sets an edge property within the active transaction context.
4638    ///
4639    /// # Errors
4640    ///
4641    /// Returns an error if the value exceeds the configured `max_property_size`.
4642    #[cfg(feature = "lpg")]
4643    pub fn set_edge_property(
4644        &self,
4645        id: grafeo_common::types::EdgeId,
4646        key: &str,
4647        value: Value,
4648    ) -> Result<()> {
4649        self.check_property_size(key, &value)?;
4650        let (_, transaction_id) = self.get_transaction_context();
4651        if let Some(tid) = transaction_id {
4652            self.active_lpg_store()
4653                .set_edge_property_versioned(id, key, value, tid);
4654        } else {
4655            self.active_lpg_store().set_edge_property(id, key, value);
4656        }
4657        Ok(())
4658    }
4659
4660    /// Deletes a node within the active transaction context.
4661    #[cfg(feature = "lpg")]
4662    pub fn delete_node(&self, id: NodeId) -> bool {
4663        let (epoch, transaction_id) = self.get_transaction_context();
4664        if let Some(tid) = transaction_id {
4665            self.active_lpg_store()
4666                .delete_node_versioned(id, epoch, tid)
4667        } else {
4668            self.active_lpg_store().delete_node(id)
4669        }
4670    }
4671
4672    /// Deletes an edge within the active transaction context.
4673    #[cfg(feature = "lpg")]
4674    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4675        let (epoch, transaction_id) = self.get_transaction_context();
4676        if let Some(tid) = transaction_id {
4677            self.active_lpg_store()
4678                .delete_edge_versioned(id, epoch, tid)
4679        } else {
4680            self.active_lpg_store().delete_edge(id)
4681        }
4682    }
4683
4684    // =========================================================================
4685    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4686    // =========================================================================
4687
4688    /// Gets a node by ID directly, bypassing query planning.
4689    ///
4690    /// This is the fastest way to retrieve a single node when you know its ID.
4691    /// Skips parsing, binding, optimization, and physical planning entirely.
4692    ///
4693    /// # Performance
4694    ///
4695    /// - Time complexity: O(1) average case
4696    /// - No lock contention (uses DashMap internally)
4697    /// - ~20-30x faster than equivalent MATCH query
4698    ///
4699    /// # Example
4700    ///
4701    /// ```no_run
4702    /// # use grafeo_engine::GrafeoDB;
4703    /// # let db = GrafeoDB::new_in_memory();
4704    /// let session = db.session();
4705    /// let node_id = session.create_node(&["Person"]);
4706    ///
4707    /// // Direct lookup - O(1), no query planning
4708    /// let node = session.get_node(node_id);
4709    /// assert!(node.is_some());
4710    /// ```
4711    #[cfg(feature = "lpg")]
4712    #[must_use]
4713    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4714        let (epoch, transaction_id) = self.get_transaction_context();
4715        self.active_lpg_store().get_node_versioned(
4716            id,
4717            epoch,
4718            transaction_id.unwrap_or(TransactionId::SYSTEM),
4719        )
4720    }
4721
4722    /// Gets a single property from a node by ID, bypassing query planning.
4723    ///
4724    /// More efficient than `get_node()` when you only need one property,
4725    /// as it avoids loading the full node with all properties.
4726    ///
4727    /// # Performance
4728    ///
4729    /// - Time complexity: O(1) average case
4730    /// - No query planning overhead
4731    ///
4732    /// # Example
4733    ///
4734    /// ```no_run
4735    /// # use grafeo_engine::GrafeoDB;
4736    /// # use grafeo_common::types::Value;
4737    /// # let db = GrafeoDB::new_in_memory();
4738    /// let session = db.session();
4739    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]).unwrap();
4740    ///
4741    /// // Direct property access - O(1)
4742    /// let name = session.get_node_property(id, "name");
4743    /// assert_eq!(name, Some(Value::String("Alix".into())));
4744    /// ```
4745    #[cfg(feature = "lpg")]
4746    #[must_use]
4747    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4748        self.get_node(id)
4749            .and_then(|node| node.get_property(key).cloned())
4750    }
4751
4752    /// Gets an edge by ID directly, bypassing query planning.
4753    ///
4754    /// # Performance
4755    ///
4756    /// - Time complexity: O(1) average case
4757    /// - No lock contention
4758    #[cfg(feature = "lpg")]
4759    #[must_use]
4760    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4761        let (epoch, transaction_id) = self.get_transaction_context();
4762        self.active_lpg_store().get_edge_versioned(
4763            id,
4764            epoch,
4765            transaction_id.unwrap_or(TransactionId::SYSTEM),
4766        )
4767    }
4768
4769    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4770    ///
4771    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4772    ///
4773    /// # Performance
4774    ///
4775    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4776    /// - Uses adjacency index for direct access
4777    /// - ~10-20x faster than equivalent MATCH query
4778    ///
4779    /// # Example
4780    ///
4781    /// ```no_run
4782    /// # use grafeo_engine::GrafeoDB;
4783    /// # let db = GrafeoDB::new_in_memory();
4784    /// let session = db.session();
4785    /// let alix = session.create_node(&["Person"]);
4786    /// let gus = session.create_node(&["Person"]);
4787    /// session.create_edge(alix, gus, "KNOWS");
4788    ///
4789    /// // Direct neighbor lookup - O(degree)
4790    /// let neighbors = session.get_neighbors_outgoing(alix);
4791    /// assert_eq!(neighbors.len(), 1);
4792    /// assert_eq!(neighbors[0].0, gus);
4793    /// ```
4794    #[cfg(feature = "lpg")]
4795    #[must_use]
4796    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4797        self.active_lpg_store()
4798            .edges_from(node, Direction::Outgoing)
4799            .collect()
4800    }
4801
4802    /// Gets incoming neighbors of a node directly, bypassing query planning.
4803    ///
4804    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4805    ///
4806    /// # Performance
4807    ///
4808    /// - Time complexity: O(degree) where degree is the number of incoming edges
4809    /// - Uses backward adjacency index for direct access
4810    #[cfg(feature = "lpg")]
4811    #[must_use]
4812    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4813        self.active_lpg_store()
4814            .edges_from(node, Direction::Incoming)
4815            .collect()
4816    }
4817
4818    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4819    ///
4820    /// # Example
4821    ///
4822    /// ```no_run
4823    /// # use grafeo_engine::GrafeoDB;
4824    /// # let db = GrafeoDB::new_in_memory();
4825    /// # let session = db.session();
4826    /// # let alix = session.create_node(&["Person"]);
4827    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4828    /// ```
4829    #[cfg(feature = "lpg")]
4830    #[must_use]
4831    pub fn get_neighbors_outgoing_by_type(
4832        &self,
4833        node: NodeId,
4834        edge_type: &str,
4835    ) -> Vec<(NodeId, EdgeId)> {
4836        self.active_lpg_store()
4837            .edges_from(node, Direction::Outgoing)
4838            .filter(|(_, edge_id)| {
4839                self.get_edge(*edge_id)
4840                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4841            })
4842            .collect()
4843    }
4844
4845    /// Checks if a node exists, bypassing query planning.
4846    ///
4847    /// # Performance
4848    ///
4849    /// - Time complexity: O(1)
4850    /// - Fastest existence check available
4851    #[cfg(feature = "lpg")]
4852    #[must_use]
4853    pub fn node_exists(&self, id: NodeId) -> bool {
4854        self.get_node(id).is_some()
4855    }
4856
4857    /// Checks if an edge exists, bypassing query planning.
4858    #[cfg(feature = "lpg")]
4859    #[must_use]
4860    pub fn edge_exists(&self, id: EdgeId) -> bool {
4861        self.get_edge(id).is_some()
4862    }
4863
4864    /// Gets the degree (number of edges) of a node.
4865    ///
4866    /// Returns (outgoing_degree, incoming_degree).
4867    #[cfg(feature = "lpg")]
4868    #[must_use]
4869    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4870        let active = self.active_lpg_store();
4871        let out = active.out_degree(node);
4872        let in_degree = active.in_degree(node);
4873        (out, in_degree)
4874    }
4875
4876    /// Batch lookup of multiple nodes by ID.
4877    ///
4878    /// More efficient than calling `get_node()` in a loop because it
4879    /// amortizes overhead.
4880    ///
4881    /// # Performance
4882    ///
4883    /// - Time complexity: O(n) where n is the number of IDs
4884    /// - Better cache utilization than individual lookups
4885    #[cfg(feature = "lpg")]
4886    #[must_use]
4887    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4888        let (epoch, transaction_id) = self.get_transaction_context();
4889        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4890        let active = self.active_lpg_store();
4891        ids.iter()
4892            .map(|&id| active.get_node_versioned(id, epoch, tx))
4893            .collect()
4894    }
4895
4896    // ── Change Data Capture ─────────────────────────────────────────────
4897
4898    /// Returns the full change history for an entity (node or edge).
4899    ///
4900    /// # Errors
4901    ///
4902    /// Returns an authorization error if the session lacks read permission.
4903    #[cfg(feature = "cdc")]
4904    pub fn history(
4905        &self,
4906        entity_id: impl Into<crate::cdc::EntityId>,
4907    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4908        self.require_permission(crate::auth::StatementKind::Read)?;
4909        Ok(self.cdc_log.history(entity_id.into()))
4910    }
4911
4912    /// Returns change events for an entity since the given epoch.
4913    ///
4914    /// # Errors
4915    ///
4916    /// Returns an authorization error if the session lacks read permission.
4917    #[cfg(feature = "cdc")]
4918    pub fn history_since(
4919        &self,
4920        entity_id: impl Into<crate::cdc::EntityId>,
4921        since_epoch: EpochId,
4922    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4923        self.require_permission(crate::auth::StatementKind::Read)?;
4924        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4925    }
4926
4927    /// Returns all change events across all entities in an epoch range.
4928    ///
4929    /// # Errors
4930    ///
4931    /// Returns an authorization error if the session lacks read permission.
4932    #[cfg(feature = "cdc")]
4933    pub fn changes_between(
4934        &self,
4935        start_epoch: EpochId,
4936        end_epoch: EpochId,
4937    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4938        self.require_permission(crate::auth::StatementKind::Read)?;
4939        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4940    }
4941}
4942
4943impl Drop for Session {
4944    fn drop(&mut self) {
4945        // Auto-rollback any active transaction to prevent leaked MVCC state,
4946        // dangling write locks, and uncommitted versions lingering in the store.
4947        #[cfg(feature = "lpg")]
4948        if self.in_transaction() {
4949            let _ = self.rollback_inner();
4950        }
4951
4952        #[cfg(feature = "metrics")]
4953        if let Some(ref reg) = self.metrics {
4954            reg.session_active
4955                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4956        }
4957    }
4958}
4959
4960#[cfg(test)]
4961mod tests {
4962    use super::parse_default_literal;
4963    use crate::database::GrafeoDB;
4964    use grafeo_common::types::Value;
4965
4966    // -----------------------------------------------------------------------
4967    // parse_default_literal
4968    // -----------------------------------------------------------------------
4969
4970    #[test]
4971    fn parse_default_literal_null() {
4972        assert_eq!(parse_default_literal("null"), Value::Null);
4973        assert_eq!(parse_default_literal("NULL"), Value::Null);
4974        assert_eq!(parse_default_literal("Null"), Value::Null);
4975    }
4976
4977    #[test]
4978    fn parse_default_literal_bool() {
4979        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4980        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4981        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4982        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4983    }
4984
4985    #[test]
4986    fn parse_default_literal_string_single_quoted() {
4987        assert_eq!(
4988            parse_default_literal("'hello'"),
4989            Value::String("hello".into())
4990        );
4991    }
4992
4993    #[test]
4994    fn parse_default_literal_string_double_quoted() {
4995        assert_eq!(
4996            parse_default_literal("\"world\""),
4997            Value::String("world".into())
4998        );
4999    }
5000
5001    #[test]
5002    fn parse_default_literal_integer() {
5003        assert_eq!(parse_default_literal("42"), Value::Int64(42));
5004        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
5005        assert_eq!(parse_default_literal("0"), Value::Int64(0));
5006    }
5007
5008    #[test]
5009    fn parse_default_literal_float() {
5010        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
5011        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
5012    }
5013
5014    #[test]
5015    fn parse_default_literal_fallback_string() {
5016        // Not a recognized literal, not quoted, not a number
5017        assert_eq!(
5018            parse_default_literal("some_identifier"),
5019            Value::String("some_identifier".into())
5020        );
5021    }
5022
5023    #[test]
5024    fn test_session_create_node() {
5025        let db = GrafeoDB::new_in_memory();
5026        let session = db.session();
5027
5028        let id = session.create_node(&["Person"]);
5029        assert!(id.is_valid());
5030        assert_eq!(db.node_count(), 1);
5031    }
5032
5033    #[test]
5034    fn test_session_transaction() {
5035        let db = GrafeoDB::new_in_memory();
5036        let mut session = db.session();
5037
5038        assert!(!session.in_transaction());
5039
5040        session.begin_transaction().unwrap();
5041        assert!(session.in_transaction());
5042
5043        session.commit().unwrap();
5044        assert!(!session.in_transaction());
5045    }
5046
5047    #[test]
5048    fn test_session_transaction_context() {
5049        let db = GrafeoDB::new_in_memory();
5050        let mut session = db.session();
5051
5052        // Without transaction - context should have current epoch and no transaction_id
5053        let (_epoch1, transaction_id1) = session.get_transaction_context();
5054        assert!(transaction_id1.is_none());
5055
5056        // Start a transaction
5057        session.begin_transaction().unwrap();
5058        let (epoch2, transaction_id2) = session.get_transaction_context();
5059        assert!(transaction_id2.is_some());
5060        // Transaction should have a valid epoch
5061        let _ = epoch2; // Use the variable
5062
5063        // Commit and verify
5064        session.commit().unwrap();
5065        let (epoch3, tx_id3) = session.get_transaction_context();
5066        assert!(tx_id3.is_none());
5067        // Epoch should have advanced after commit
5068        assert!(epoch3.as_u64() >= epoch2.as_u64());
5069    }
5070
5071    #[test]
5072    fn test_session_rollback() {
5073        let db = GrafeoDB::new_in_memory();
5074        let mut session = db.session();
5075
5076        session.begin_transaction().unwrap();
5077        session.rollback().unwrap();
5078        assert!(!session.in_transaction());
5079    }
5080
5081    #[test]
5082    fn test_session_rollback_discards_versions() {
5083        use grafeo_common::types::TransactionId;
5084
5085        let db = GrafeoDB::new_in_memory();
5086
5087        // Create a node outside of any transaction (at system level)
5088        let node_before = db.store().create_node(&["Person"]);
5089        assert!(node_before.is_valid());
5090        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5091
5092        // Start a transaction
5093        let mut session = db.session();
5094        session.begin_transaction().unwrap();
5095        let transaction_id = session.current_transaction.lock().unwrap();
5096
5097        // Create a node versioned with the transaction's ID
5098        let epoch = db.store().current_epoch();
5099        let node_in_tx = db
5100            .store()
5101            .create_node_versioned(&["Person"], epoch, transaction_id);
5102        assert!(node_in_tx.is_valid());
5103
5104        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5105        // non-versioned lookups like node_count(). Verify the node is visible
5106        // only through the owning transaction.
5107        assert_eq!(
5108            db.node_count(),
5109            1,
5110            "PENDING nodes should be invisible to non-versioned node_count()"
5111        );
5112        assert!(
5113            db.store()
5114                .get_node_versioned(node_in_tx, epoch, transaction_id)
5115                .is_some(),
5116            "Transaction node should be visible to its own transaction"
5117        );
5118
5119        // Rollback the transaction
5120        session.rollback().unwrap();
5121        assert!(!session.in_transaction());
5122
5123        // The node created in the transaction should be discarded
5124        // Only the first node should remain visible
5125        let count_after = db.node_count();
5126        assert_eq!(
5127            count_after, 1,
5128            "Rollback should discard uncommitted node, but got {count_after}"
5129        );
5130
5131        // The original node should still be accessible
5132        let current_epoch = db.store().current_epoch();
5133        assert!(
5134            db.store()
5135                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
5136                .is_some(),
5137            "Original node should still exist"
5138        );
5139
5140        // The node created in the transaction should not be accessible
5141        assert!(
5142            db.store()
5143                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
5144                .is_none(),
5145            "Transaction node should be gone"
5146        );
5147    }
5148
5149    #[test]
5150    fn test_session_create_node_in_transaction() {
5151        // Test that session.create_node() is transaction-aware
5152        let db = GrafeoDB::new_in_memory();
5153
5154        // Create a node outside of any transaction
5155        let node_before = db.create_node(&["Person"]);
5156        assert!(node_before.is_valid());
5157        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5158
5159        // Start a transaction and create a node through the session
5160        let mut session = db.session();
5161        session.begin_transaction().unwrap();
5162        let transaction_id = session.current_transaction.lock().unwrap();
5163
5164        // Create a node through session.create_node() - should be versioned with tx
5165        let node_in_tx = session.create_node(&["Person"]);
5166        assert!(node_in_tx.is_valid());
5167
5168        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5169        // non-versioned lookups. Verify the node is visible only to its own tx.
5170        assert_eq!(
5171            db.node_count(),
5172            1,
5173            "PENDING nodes should be invisible to non-versioned node_count()"
5174        );
5175        let epoch = db.store().current_epoch();
5176        assert!(
5177            db.store()
5178                .get_node_versioned(node_in_tx, epoch, transaction_id)
5179                .is_some(),
5180            "Transaction node should be visible to its own transaction"
5181        );
5182
5183        // Rollback the transaction
5184        session.rollback().unwrap();
5185
5186        // The node created via session.create_node() should be discarded
5187        let count_after = db.node_count();
5188        assert_eq!(
5189            count_after, 1,
5190            "Rollback should discard node created via session.create_node(), but got {count_after}"
5191        );
5192    }
5193
5194    #[test]
5195    fn test_session_create_node_with_props_in_transaction() {
5196        use grafeo_common::types::Value;
5197
5198        // Test that session.create_node_with_props() is transaction-aware
5199        let db = GrafeoDB::new_in_memory();
5200
5201        // Create a node outside of any transaction
5202        db.create_node(&["Person"]);
5203        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
5204
5205        // Start a transaction and create a node with properties
5206        let mut session = db.session();
5207        session.begin_transaction().unwrap();
5208        let transaction_id = session.current_transaction.lock().unwrap();
5209
5210        let node_in_tx = session
5211            .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5212            .unwrap();
5213        assert!(node_in_tx.is_valid());
5214
5215        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
5216        // non-versioned lookups. Verify the node is visible only to its own tx.
5217        assert_eq!(
5218            db.node_count(),
5219            1,
5220            "PENDING nodes should be invisible to non-versioned node_count()"
5221        );
5222        let epoch = db.store().current_epoch();
5223        assert!(
5224            db.store()
5225                .get_node_versioned(node_in_tx, epoch, transaction_id)
5226                .is_some(),
5227            "Transaction node should be visible to its own transaction"
5228        );
5229
5230        // Rollback the transaction
5231        session.rollback().unwrap();
5232
5233        // The node should be discarded
5234        let count_after = db.node_count();
5235        assert_eq!(
5236            count_after, 1,
5237            "Rollback should discard node created via session.create_node_with_props()"
5238        );
5239    }
5240
5241    #[cfg(feature = "gql")]
5242    mod gql_tests {
5243        use super::*;
5244
5245        #[test]
5246        fn test_gql_query_execution() {
5247            let db = GrafeoDB::new_in_memory();
5248            let session = db.session();
5249
5250            // Create some test data
5251            session.create_node(&["Person"]);
5252            session.create_node(&["Person"]);
5253            session.create_node(&["Animal"]);
5254
5255            // Execute a GQL query
5256            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5257
5258            // Should return 2 Person nodes
5259            assert_eq!(result.row_count(), 2);
5260            assert_eq!(result.column_count(), 1);
5261            assert_eq!(result.columns[0], "n");
5262        }
5263
5264        #[test]
5265        fn test_gql_empty_result() {
5266            let db = GrafeoDB::new_in_memory();
5267            let session = db.session();
5268
5269            // No data in database
5270            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
5271
5272            assert_eq!(result.row_count(), 0);
5273        }
5274
5275        #[test]
5276        fn test_gql_parse_error() {
5277            let db = GrafeoDB::new_in_memory();
5278            let session = db.session();
5279
5280            // Invalid GQL syntax
5281            let result = session.execute("MATCH (n RETURN n");
5282
5283            assert!(result.is_err());
5284        }
5285
5286        #[test]
5287        fn test_gql_relationship_traversal() {
5288            let db = GrafeoDB::new_in_memory();
5289            let session = db.session();
5290
5291            // Create a graph: Alix -> Gus, Alix -> Vincent
5292            let alix = session.create_node(&["Person"]);
5293            let gus = session.create_node(&["Person"]);
5294            let vincent = session.create_node(&["Person"]);
5295
5296            session.create_edge(alix, gus, "KNOWS");
5297            session.create_edge(alix, vincent, "KNOWS");
5298
5299            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
5300            let result = session
5301                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5302                .unwrap();
5303
5304            // Should return 2 rows (Alix->Gus, Alix->Vincent)
5305            assert_eq!(result.row_count(), 2);
5306            assert_eq!(result.column_count(), 2);
5307            assert_eq!(result.columns[0], "a");
5308            assert_eq!(result.columns[1], "b");
5309        }
5310
5311        #[test]
5312        fn test_gql_relationship_with_type_filter() {
5313            let db = GrafeoDB::new_in_memory();
5314            let session = db.session();
5315
5316            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
5317            let alix = session.create_node(&["Person"]);
5318            let gus = session.create_node(&["Person"]);
5319            let vincent = session.create_node(&["Person"]);
5320
5321            session.create_edge(alix, gus, "KNOWS");
5322            session.create_edge(alix, vincent, "WORKS_WITH");
5323
5324            // Query only KNOWS relationships
5325            let result = session
5326                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
5327                .unwrap();
5328
5329            // Should return only 1 row (Alix->Gus)
5330            assert_eq!(result.row_count(), 1);
5331        }
5332
5333        #[test]
5334        fn test_gql_semantic_error_undefined_variable() {
5335            let db = GrafeoDB::new_in_memory();
5336            let session = db.session();
5337
5338            // Reference undefined variable 'x' in RETURN
5339            let result = session.execute("MATCH (n:Person) RETURN x");
5340
5341            // Should fail with semantic error
5342            assert!(result.is_err());
5343            let Err(err) = result else {
5344                panic!("Expected error")
5345            };
5346            assert!(
5347                err.to_string().contains("Undefined variable"),
5348                "Expected undefined variable error, got: {}",
5349                err
5350            );
5351        }
5352
5353        #[test]
5354        fn test_gql_where_clause_property_filter() {
5355            use grafeo_common::types::Value;
5356
5357            let db = GrafeoDB::new_in_memory();
5358            let session = db.session();
5359
5360            // Create people with ages
5361            session
5362                .create_node_with_props(&["Person"], [("age", Value::Int64(25))])
5363                .unwrap();
5364            session
5365                .create_node_with_props(&["Person"], [("age", Value::Int64(35))])
5366                .unwrap();
5367            session
5368                .create_node_with_props(&["Person"], [("age", Value::Int64(45))])
5369                .unwrap();
5370
5371            // Query with WHERE clause: age > 30
5372            let result = session
5373                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
5374                .unwrap();
5375
5376            // Should return 2 people (ages 35 and 45)
5377            assert_eq!(result.row_count(), 2);
5378        }
5379
5380        #[test]
5381        fn test_gql_where_clause_equality() {
5382            use grafeo_common::types::Value;
5383
5384            let db = GrafeoDB::new_in_memory();
5385            let session = db.session();
5386
5387            // Create people with names
5388            session
5389                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5390                .unwrap();
5391            session
5392                .create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))])
5393                .unwrap();
5394            session
5395                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5396                .unwrap();
5397
5398            // Query with WHERE clause: name = "Alix"
5399            let result = session
5400                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
5401                .unwrap();
5402
5403            // Should return 2 people named Alix
5404            assert_eq!(result.row_count(), 2);
5405        }
5406
5407        #[test]
5408        fn test_gql_return_property_access() {
5409            use grafeo_common::types::Value;
5410
5411            let db = GrafeoDB::new_in_memory();
5412            let session = db.session();
5413
5414            // Create people with names and ages
5415            session
5416                .create_node_with_props(
5417                    &["Person"],
5418                    [
5419                        ("name", Value::String("Alix".into())),
5420                        ("age", Value::Int64(30)),
5421                    ],
5422                )
5423                .unwrap();
5424            session
5425                .create_node_with_props(
5426                    &["Person"],
5427                    [
5428                        ("name", Value::String("Gus".into())),
5429                        ("age", Value::Int64(25)),
5430                    ],
5431                )
5432                .unwrap();
5433
5434            // Query returning properties
5435            let result = session
5436                .execute("MATCH (n:Person) RETURN n.name, n.age")
5437                .unwrap();
5438
5439            // Should return 2 rows with name and age columns
5440            assert_eq!(result.row_count(), 2);
5441            assert_eq!(result.column_count(), 2);
5442            assert_eq!(result.columns[0], "n.name");
5443            assert_eq!(result.columns[1], "n.age");
5444
5445            // Check that we get actual values
5446            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
5447            assert!(names.contains(&&Value::String("Alix".into())));
5448            assert!(names.contains(&&Value::String("Gus".into())));
5449        }
5450
5451        #[test]
5452        fn test_gql_return_mixed_expressions() {
5453            use grafeo_common::types::Value;
5454
5455            let db = GrafeoDB::new_in_memory();
5456            let session = db.session();
5457
5458            // Create a person
5459            session
5460                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5461                .unwrap();
5462
5463            // Query returning both node and property
5464            let result = session
5465                .execute("MATCH (n:Person) RETURN n, n.name")
5466                .unwrap();
5467
5468            assert_eq!(result.row_count(), 1);
5469            assert_eq!(result.column_count(), 2);
5470            assert_eq!(result.columns[0], "n");
5471            assert_eq!(result.columns[1], "n.name");
5472
5473            // Second column should be the name
5474            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
5475        }
5476    }
5477
5478    #[cfg(feature = "cypher")]
5479    mod cypher_tests {
5480        use super::*;
5481
5482        #[test]
5483        fn test_cypher_query_execution() {
5484            let db = GrafeoDB::new_in_memory();
5485            let session = db.session();
5486
5487            // Create some test data
5488            session.create_node(&["Person"]);
5489            session.create_node(&["Person"]);
5490            session.create_node(&["Animal"]);
5491
5492            // Execute a Cypher query
5493            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5494
5495            // Should return 2 Person nodes
5496            assert_eq!(result.row_count(), 2);
5497            assert_eq!(result.column_count(), 1);
5498            assert_eq!(result.columns[0], "n");
5499        }
5500
5501        #[test]
5502        fn test_cypher_empty_result() {
5503            let db = GrafeoDB::new_in_memory();
5504            let session = db.session();
5505
5506            // No data in database
5507            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
5508
5509            assert_eq!(result.row_count(), 0);
5510        }
5511
5512        #[test]
5513        fn test_cypher_parse_error() {
5514            let db = GrafeoDB::new_in_memory();
5515            let session = db.session();
5516
5517            // Invalid Cypher syntax
5518            let result = session.execute_cypher("MATCH (n RETURN n");
5519
5520            assert!(result.is_err());
5521        }
5522    }
5523
5524    // ==================== Direct Lookup API Tests ====================
5525
5526    mod direct_lookup_tests {
5527        use super::*;
5528        use grafeo_common::types::Value;
5529
5530        #[test]
5531        fn test_get_node() {
5532            let db = GrafeoDB::new_in_memory();
5533            let session = db.session();
5534
5535            let id = session.create_node(&["Person"]);
5536            let node = session.get_node(id);
5537
5538            assert!(node.is_some());
5539            let node = node.unwrap();
5540            assert_eq!(node.id, id);
5541        }
5542
5543        #[test]
5544        fn test_get_node_not_found() {
5545            use grafeo_common::types::NodeId;
5546
5547            let db = GrafeoDB::new_in_memory();
5548            let session = db.session();
5549
5550            // Try to get a non-existent node
5551            let node = session.get_node(NodeId::new(9999));
5552            assert!(node.is_none());
5553        }
5554
5555        #[test]
5556        fn test_get_node_property() {
5557            let db = GrafeoDB::new_in_memory();
5558            let session = db.session();
5559
5560            let id = session
5561                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))])
5562                .unwrap();
5563
5564            let name = session.get_node_property(id, "name");
5565            assert_eq!(name, Some(Value::String("Alix".into())));
5566
5567            // Non-existent property
5568            let missing = session.get_node_property(id, "missing");
5569            assert!(missing.is_none());
5570        }
5571
5572        #[test]
5573        fn test_get_edge() {
5574            let db = GrafeoDB::new_in_memory();
5575            let session = db.session();
5576
5577            let alix = session.create_node(&["Person"]);
5578            let gus = session.create_node(&["Person"]);
5579            let edge_id = session.create_edge(alix, gus, "KNOWS");
5580
5581            let edge = session.get_edge(edge_id);
5582            assert!(edge.is_some());
5583            let edge = edge.unwrap();
5584            assert_eq!(edge.id, edge_id);
5585            assert_eq!(edge.src, alix);
5586            assert_eq!(edge.dst, gus);
5587        }
5588
5589        #[test]
5590        fn test_get_edge_not_found() {
5591            use grafeo_common::types::EdgeId;
5592
5593            let db = GrafeoDB::new_in_memory();
5594            let session = db.session();
5595
5596            let edge = session.get_edge(EdgeId::new(9999));
5597            assert!(edge.is_none());
5598        }
5599
5600        #[test]
5601        fn test_get_neighbors_outgoing() {
5602            let db = GrafeoDB::new_in_memory();
5603            let session = db.session();
5604
5605            let alix = session.create_node(&["Person"]);
5606            let gus = session.create_node(&["Person"]);
5607            let harm = session.create_node(&["Person"]);
5608
5609            session.create_edge(alix, gus, "KNOWS");
5610            session.create_edge(alix, harm, "KNOWS");
5611
5612            let neighbors = session.get_neighbors_outgoing(alix);
5613            assert_eq!(neighbors.len(), 2);
5614
5615            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5616            assert!(neighbor_ids.contains(&gus));
5617            assert!(neighbor_ids.contains(&harm));
5618        }
5619
5620        #[test]
5621        fn test_get_neighbors_incoming() {
5622            let db = GrafeoDB::new_in_memory();
5623            let session = db.session();
5624
5625            let alix = session.create_node(&["Person"]);
5626            let gus = session.create_node(&["Person"]);
5627            let harm = session.create_node(&["Person"]);
5628
5629            session.create_edge(gus, alix, "KNOWS");
5630            session.create_edge(harm, alix, "KNOWS");
5631
5632            let neighbors = session.get_neighbors_incoming(alix);
5633            assert_eq!(neighbors.len(), 2);
5634
5635            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5636            assert!(neighbor_ids.contains(&gus));
5637            assert!(neighbor_ids.contains(&harm));
5638        }
5639
5640        #[test]
5641        fn test_get_neighbors_outgoing_by_type() {
5642            let db = GrafeoDB::new_in_memory();
5643            let session = db.session();
5644
5645            let alix = session.create_node(&["Person"]);
5646            let gus = session.create_node(&["Person"]);
5647            let company = session.create_node(&["Company"]);
5648
5649            session.create_edge(alix, gus, "KNOWS");
5650            session.create_edge(alix, company, "WORKS_AT");
5651
5652            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5653            assert_eq!(knows_neighbors.len(), 1);
5654            assert_eq!(knows_neighbors[0].0, gus);
5655
5656            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5657            assert_eq!(works_neighbors.len(), 1);
5658            assert_eq!(works_neighbors[0].0, company);
5659
5660            // No edges of this type
5661            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5662            assert!(no_neighbors.is_empty());
5663        }
5664
5665        #[test]
5666        fn test_node_exists() {
5667            use grafeo_common::types::NodeId;
5668
5669            let db = GrafeoDB::new_in_memory();
5670            let session = db.session();
5671
5672            let id = session.create_node(&["Person"]);
5673
5674            assert!(session.node_exists(id));
5675            assert!(!session.node_exists(NodeId::new(9999)));
5676        }
5677
5678        #[test]
5679        fn test_edge_exists() {
5680            use grafeo_common::types::EdgeId;
5681
5682            let db = GrafeoDB::new_in_memory();
5683            let session = db.session();
5684
5685            let alix = session.create_node(&["Person"]);
5686            let gus = session.create_node(&["Person"]);
5687            let edge_id = session.create_edge(alix, gus, "KNOWS");
5688
5689            assert!(session.edge_exists(edge_id));
5690            assert!(!session.edge_exists(EdgeId::new(9999)));
5691        }
5692
5693        #[test]
5694        fn test_get_degree() {
5695            let db = GrafeoDB::new_in_memory();
5696            let session = db.session();
5697
5698            let alix = session.create_node(&["Person"]);
5699            let gus = session.create_node(&["Person"]);
5700            let harm = session.create_node(&["Person"]);
5701
5702            // Alix knows Gus and Harm (2 outgoing)
5703            session.create_edge(alix, gus, "KNOWS");
5704            session.create_edge(alix, harm, "KNOWS");
5705            // Gus knows Alix (1 incoming for Alix)
5706            session.create_edge(gus, alix, "KNOWS");
5707
5708            let (out_degree, in_degree) = session.get_degree(alix);
5709            assert_eq!(out_degree, 2);
5710            assert_eq!(in_degree, 1);
5711
5712            // Node with no edges
5713            let lonely = session.create_node(&["Person"]);
5714            let (out, in_deg) = session.get_degree(lonely);
5715            assert_eq!(out, 0);
5716            assert_eq!(in_deg, 0);
5717        }
5718
5719        #[test]
5720        fn test_get_nodes_batch() {
5721            let db = GrafeoDB::new_in_memory();
5722            let session = db.session();
5723
5724            let alix = session.create_node(&["Person"]);
5725            let gus = session.create_node(&["Person"]);
5726            let harm = session.create_node(&["Person"]);
5727
5728            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5729            assert_eq!(nodes.len(), 3);
5730            assert!(nodes[0].is_some());
5731            assert!(nodes[1].is_some());
5732            assert!(nodes[2].is_some());
5733
5734            // With non-existent node
5735            use grafeo_common::types::NodeId;
5736            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5737            assert_eq!(nodes_with_missing.len(), 3);
5738            assert!(nodes_with_missing[0].is_some());
5739            assert!(nodes_with_missing[1].is_none()); // Missing node
5740            assert!(nodes_with_missing[2].is_some());
5741        }
5742
5743        #[test]
5744        fn test_auto_commit_setting() {
5745            let db = GrafeoDB::new_in_memory();
5746            let mut session = db.session();
5747
5748            // Default is auto-commit enabled
5749            assert!(session.auto_commit());
5750
5751            session.set_auto_commit(false);
5752            assert!(!session.auto_commit());
5753
5754            session.set_auto_commit(true);
5755            assert!(session.auto_commit());
5756        }
5757
5758        #[test]
5759        fn test_transaction_double_begin_nests() {
5760            let db = GrafeoDB::new_in_memory();
5761            let mut session = db.session();
5762
5763            session.begin_transaction().unwrap();
5764            // Second begin_transaction creates a nested transaction (auto-savepoint)
5765            let result = session.begin_transaction();
5766            assert!(result.is_ok());
5767            // Commit the inner (releases savepoint)
5768            session.commit().unwrap();
5769            // Commit the outer
5770            session.commit().unwrap();
5771        }
5772
5773        #[test]
5774        fn test_commit_without_transaction_error() {
5775            let db = GrafeoDB::new_in_memory();
5776            let mut session = db.session();
5777
5778            let result = session.commit();
5779            assert!(result.is_err());
5780        }
5781
5782        #[test]
5783        fn test_rollback_without_transaction_error() {
5784            let db = GrafeoDB::new_in_memory();
5785            let mut session = db.session();
5786
5787            let result = session.rollback();
5788            assert!(result.is_err());
5789        }
5790
5791        #[test]
5792        fn test_create_edge_in_transaction() {
5793            let db = GrafeoDB::new_in_memory();
5794            let mut session = db.session();
5795
5796            // Create nodes outside transaction
5797            let alix = session.create_node(&["Person"]);
5798            let gus = session.create_node(&["Person"]);
5799
5800            // Create edge in transaction
5801            session.begin_transaction().unwrap();
5802            let edge_id = session.create_edge(alix, gus, "KNOWS");
5803
5804            // Edge should be visible in the transaction
5805            assert!(session.edge_exists(edge_id));
5806
5807            // Commit
5808            session.commit().unwrap();
5809
5810            // Edge should still be visible
5811            assert!(session.edge_exists(edge_id));
5812        }
5813
5814        #[test]
5815        fn test_neighbors_empty_node() {
5816            let db = GrafeoDB::new_in_memory();
5817            let session = db.session();
5818
5819            let lonely = session.create_node(&["Person"]);
5820
5821            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5822            assert!(session.get_neighbors_incoming(lonely).is_empty());
5823            assert!(
5824                session
5825                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5826                    .is_empty()
5827            );
5828        }
5829    }
5830
5831    #[test]
5832    fn test_auto_gc_triggers_on_commit_interval() {
5833        use crate::config::Config;
5834
5835        let config = Config::in_memory().with_gc_interval(2);
5836        let db = GrafeoDB::with_config(config).unwrap();
5837        let mut session = db.session();
5838
5839        // First commit: counter = 1, no GC (not a multiple of 2)
5840        session.begin_transaction().unwrap();
5841        session.create_node(&["A"]);
5842        session.commit().unwrap();
5843
5844        // Second commit: counter = 2, GC should trigger (multiple of 2)
5845        session.begin_transaction().unwrap();
5846        session.create_node(&["B"]);
5847        session.commit().unwrap();
5848
5849        // Verify the database is still functional after GC
5850        assert_eq!(db.node_count(), 2);
5851    }
5852
5853    #[test]
5854    fn test_query_timeout_config_propagates_to_session() {
5855        use crate::config::Config;
5856        use std::time::Duration;
5857
5858        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5859        let db = GrafeoDB::with_config(config).unwrap();
5860        let session = db.session();
5861
5862        // Verify the session has a query deadline (timeout was set)
5863        assert!(session.query_deadline().is_some());
5864    }
5865
5866    #[test]
5867    fn test_default_query_timeout_returns_deadline() {
5868        let db = GrafeoDB::new_in_memory();
5869        let session = db.session();
5870
5871        // Default config has 30s timeout
5872        assert!(session.query_deadline().is_some());
5873    }
5874
5875    #[test]
5876    fn test_no_query_timeout_returns_no_deadline() {
5877        use crate::config::Config;
5878
5879        let config = Config::in_memory().without_query_timeout();
5880        let db = GrafeoDB::with_config(config).unwrap();
5881        let session = db.session();
5882
5883        assert!(session.query_deadline().is_none());
5884    }
5885
5886    #[test]
5887    fn test_graph_model_accessor() {
5888        use crate::config::GraphModel;
5889
5890        let db = GrafeoDB::new_in_memory();
5891        let session = db.session();
5892
5893        assert_eq!(session.graph_model(), GraphModel::Lpg);
5894    }
5895
5896    #[test]
5897    fn test_reject_oversized_property() {
5898        use crate::config::Config;
5899
5900        let config = Config::in_memory().with_max_property_size(100);
5901        let db = GrafeoDB::with_config(config).unwrap();
5902        let session = db.session();
5903
5904        let node = session.create_node(&["Test"]);
5905
5906        // Small property should succeed
5907        session
5908            .set_node_property(node, "small", Value::from("hello"))
5909            .unwrap();
5910
5911        // Large property should be rejected
5912        let big = "x".repeat(200);
5913        let result = session.set_node_property(node, "big", Value::from(big.as_str()));
5914        assert!(result.is_err());
5915        let err = result.unwrap_err().to_string();
5916        assert!(
5917            err.contains("exceeds maximum size"),
5918            "Expected size error, got: {err}"
5919        );
5920    }
5921
5922    #[test]
5923    fn test_no_property_size_limit() {
5924        use crate::config::Config;
5925
5926        let config = Config::in_memory().without_max_property_size();
5927        let db = GrafeoDB::with_config(config).unwrap();
5928        let session = db.session();
5929
5930        let node = session.create_node(&["Test"]);
5931
5932        // Even large properties should succeed with no limit
5933        let big = "x".repeat(10_000);
5934        session
5935            .set_node_property(node, "big", Value::from(big.as_str()))
5936            .unwrap();
5937    }
5938
5939    #[cfg(feature = "gql")]
5940    #[test]
5941    fn test_external_store_session() {
5942        use grafeo_core::graph::GraphStoreMut;
5943        use std::sync::Arc;
5944
5945        let config = crate::config::Config::in_memory();
5946        let store =
5947            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5948        let db = GrafeoDB::with_store(store, config).unwrap();
5949
5950        let mut session = db.session();
5951
5952        // Use an explicit transaction so that INSERT and MATCH share the same
5953        // transaction context. With PENDING epochs, uncommitted versions are
5954        // only visible to the owning transaction.
5955        session.begin_transaction().unwrap();
5956        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5957
5958        // Verify we can query through it within the same transaction
5959        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5960        assert_eq!(result.row_count(), 1);
5961
5962        session.commit().unwrap();
5963    }
5964
5965    // ==================== Session Command Tests ====================
5966
5967    #[cfg(feature = "gql")]
5968    mod session_command_tests {
5969        use super::*;
5970        use grafeo_common::types::Value;
5971
5972        #[test]
5973        fn test_use_graph_sets_current_graph() {
5974            let db = GrafeoDB::new_in_memory();
5975            let session = db.session();
5976
5977            // Create the graph first, then USE it
5978            session.execute("CREATE GRAPH mydb").unwrap();
5979            session.execute("USE GRAPH mydb").unwrap();
5980
5981            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5982        }
5983
5984        #[test]
5985        fn test_use_graph_nonexistent_errors() {
5986            let db = GrafeoDB::new_in_memory();
5987            let session = db.session();
5988
5989            let result = session.execute("USE GRAPH doesnotexist");
5990            assert!(result.is_err());
5991            let err = result.unwrap_err().to_string();
5992            assert!(
5993                err.contains("does not exist"),
5994                "Expected 'does not exist' error, got: {err}"
5995            );
5996        }
5997
5998        #[test]
5999        fn test_use_graph_default_always_valid() {
6000            let db = GrafeoDB::new_in_memory();
6001            let session = db.session();
6002
6003            // "default" is always valid, even without CREATE GRAPH
6004            session.execute("USE GRAPH default").unwrap();
6005            assert_eq!(session.current_graph(), Some("default".to_string()));
6006        }
6007
6008        #[test]
6009        fn test_session_set_graph() {
6010            let db = GrafeoDB::new_in_memory();
6011            let session = db.session();
6012
6013            session.execute("CREATE GRAPH analytics").unwrap();
6014            session.execute("SESSION SET GRAPH analytics").unwrap();
6015            assert_eq!(session.current_graph(), Some("analytics".to_string()));
6016        }
6017
6018        #[test]
6019        fn test_session_set_graph_nonexistent_errors() {
6020            let db = GrafeoDB::new_in_memory();
6021            let session = db.session();
6022
6023            let result = session.execute("SESSION SET GRAPH nosuchgraph");
6024            assert!(result.is_err());
6025        }
6026
6027        #[test]
6028        fn test_session_set_time_zone() {
6029            let db = GrafeoDB::new_in_memory();
6030            let session = db.session();
6031
6032            assert_eq!(session.time_zone(), None);
6033
6034            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6035            assert_eq!(session.time_zone(), Some("UTC".to_string()));
6036
6037            session
6038                .execute("SESSION SET TIME ZONE 'America/New_York'")
6039                .unwrap();
6040            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
6041        }
6042
6043        #[test]
6044        fn test_session_set_parameter() {
6045            let db = GrafeoDB::new_in_memory();
6046            let session = db.session();
6047
6048            session
6049                .execute("SESSION SET PARAMETER $timeout = 30")
6050                .unwrap();
6051
6052            // Parameter is stored (value is Null for now, since expression
6053            // evaluation is not yet wired up)
6054            assert!(session.get_parameter("timeout").is_some());
6055        }
6056
6057        #[test]
6058        fn test_session_reset_clears_all_state() {
6059            let db = GrafeoDB::new_in_memory();
6060            let session = db.session();
6061
6062            // Set various session state
6063            session.execute("CREATE GRAPH analytics").unwrap();
6064            session.execute("SESSION SET GRAPH analytics").unwrap();
6065            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6066            session
6067                .execute("SESSION SET PARAMETER $limit = 100")
6068                .unwrap();
6069
6070            // Verify state was set
6071            assert!(session.current_graph().is_some());
6072            assert!(session.time_zone().is_some());
6073            assert!(session.get_parameter("limit").is_some());
6074
6075            // Reset everything
6076            session.execute("SESSION RESET").unwrap();
6077
6078            assert_eq!(session.current_graph(), None);
6079            assert_eq!(session.time_zone(), None);
6080            assert!(session.get_parameter("limit").is_none());
6081        }
6082
6083        #[test]
6084        fn test_session_close_clears_state() {
6085            let db = GrafeoDB::new_in_memory();
6086            let session = db.session();
6087
6088            session.execute("CREATE GRAPH analytics").unwrap();
6089            session.execute("SESSION SET GRAPH analytics").unwrap();
6090            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
6091
6092            session.execute("SESSION CLOSE").unwrap();
6093
6094            assert_eq!(session.current_graph(), None);
6095            assert_eq!(session.time_zone(), None);
6096        }
6097
6098        #[test]
6099        fn test_create_graph() {
6100            let db = GrafeoDB::new_in_memory();
6101            let session = db.session();
6102
6103            session.execute("CREATE GRAPH mydb").unwrap();
6104
6105            // Should be able to USE it now
6106            session.execute("USE GRAPH mydb").unwrap();
6107            assert_eq!(session.current_graph(), Some("mydb".to_string()));
6108        }
6109
6110        #[test]
6111        fn test_create_graph_duplicate_errors() {
6112            let db = GrafeoDB::new_in_memory();
6113            let session = db.session();
6114
6115            session.execute("CREATE GRAPH mydb").unwrap();
6116            let result = session.execute("CREATE GRAPH mydb");
6117
6118            assert!(result.is_err());
6119            let err = result.unwrap_err().to_string();
6120            assert!(
6121                err.contains("already exists"),
6122                "Expected 'already exists' error, got: {err}"
6123            );
6124        }
6125
6126        #[test]
6127        fn test_create_graph_if_not_exists() {
6128            let db = GrafeoDB::new_in_memory();
6129            let session = db.session();
6130
6131            session.execute("CREATE GRAPH mydb").unwrap();
6132            // Should succeed silently with IF NOT EXISTS
6133            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
6134        }
6135
6136        #[test]
6137        fn test_drop_graph() {
6138            let db = GrafeoDB::new_in_memory();
6139            let session = db.session();
6140
6141            session.execute("CREATE GRAPH mydb").unwrap();
6142            session.execute("DROP GRAPH mydb").unwrap();
6143
6144            // Should no longer be usable
6145            let result = session.execute("USE GRAPH mydb");
6146            assert!(result.is_err());
6147        }
6148
6149        #[test]
6150        fn test_drop_graph_nonexistent_errors() {
6151            let db = GrafeoDB::new_in_memory();
6152            let session = db.session();
6153
6154            let result = session.execute("DROP GRAPH nosuchgraph");
6155            assert!(result.is_err());
6156            let err = result.unwrap_err().to_string();
6157            assert!(
6158                err.contains("does not exist"),
6159                "Expected 'does not exist' error, got: {err}"
6160            );
6161        }
6162
6163        #[test]
6164        fn test_drop_graph_if_exists() {
6165            let db = GrafeoDB::new_in_memory();
6166            let session = db.session();
6167
6168            // Should succeed silently with IF EXISTS
6169            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
6170        }
6171
6172        #[test]
6173        fn test_start_transaction_via_gql() {
6174            let db = GrafeoDB::new_in_memory();
6175            let session = db.session();
6176
6177            session.execute("START TRANSACTION").unwrap();
6178            assert!(session.in_transaction());
6179            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6180            session.execute("COMMIT").unwrap();
6181            assert!(!session.in_transaction());
6182
6183            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6184            assert_eq!(result.rows.len(), 1);
6185        }
6186
6187        #[test]
6188        fn test_start_transaction_read_only_blocks_insert() {
6189            let db = GrafeoDB::new_in_memory();
6190            let session = db.session();
6191
6192            session.execute("START TRANSACTION READ ONLY").unwrap();
6193            let result = session.execute("INSERT (:Person {name: 'Alix'})");
6194            assert!(result.is_err());
6195            let err = result.unwrap_err().to_string();
6196            assert!(
6197                err.contains("read-only"),
6198                "Expected read-only error, got: {err}"
6199            );
6200            session.execute("ROLLBACK").unwrap();
6201        }
6202
6203        #[test]
6204        fn test_start_transaction_read_only_allows_reads() {
6205            let db = GrafeoDB::new_in_memory();
6206            let mut session = db.session();
6207            session.begin_transaction().unwrap();
6208            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6209            session.commit().unwrap();
6210
6211            session.execute("START TRANSACTION READ ONLY").unwrap();
6212            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6213            assert_eq!(result.rows.len(), 1);
6214            session.execute("COMMIT").unwrap();
6215        }
6216
6217        #[test]
6218        fn test_rollback_via_gql() {
6219            let db = GrafeoDB::new_in_memory();
6220            let session = db.session();
6221
6222            session.execute("START TRANSACTION").unwrap();
6223            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
6224            session.execute("ROLLBACK").unwrap();
6225
6226            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
6227            assert!(result.rows.is_empty());
6228        }
6229
6230        #[test]
6231        fn test_start_transaction_with_isolation_level() {
6232            let db = GrafeoDB::new_in_memory();
6233            let session = db.session();
6234
6235            session
6236                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
6237                .unwrap();
6238            assert!(session.in_transaction());
6239            session.execute("ROLLBACK").unwrap();
6240        }
6241
6242        #[test]
6243        fn test_session_commands_return_empty_result() {
6244            let db = GrafeoDB::new_in_memory();
6245            let session = db.session();
6246
6247            session.execute("CREATE GRAPH test").unwrap();
6248            let result = session.execute("SESSION SET GRAPH test").unwrap();
6249            assert_eq!(result.row_count(), 0);
6250            assert_eq!(result.column_count(), 0);
6251        }
6252
6253        #[test]
6254        fn test_current_graph_default_is_none() {
6255            let db = GrafeoDB::new_in_memory();
6256            let session = db.session();
6257
6258            assert_eq!(session.current_graph(), None);
6259        }
6260
6261        #[test]
6262        fn test_time_zone_default_is_none() {
6263            let db = GrafeoDB::new_in_memory();
6264            let session = db.session();
6265
6266            assert_eq!(session.time_zone(), None);
6267        }
6268
6269        #[test]
6270        fn test_session_state_independent_across_sessions() {
6271            let db = GrafeoDB::new_in_memory();
6272            let session1 = db.session();
6273            let session2 = db.session();
6274
6275            session1.execute("CREATE GRAPH first").unwrap();
6276            session1.execute("CREATE GRAPH second").unwrap();
6277            session1.execute("SESSION SET GRAPH first").unwrap();
6278            session2.execute("SESSION SET GRAPH second").unwrap();
6279
6280            assert_eq!(session1.current_graph(), Some("first".to_string()));
6281            assert_eq!(session2.current_graph(), Some("second".to_string()));
6282        }
6283
6284        #[test]
6285        fn test_show_node_types() {
6286            let db = GrafeoDB::new_in_memory();
6287            let session = db.session();
6288
6289            session
6290                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
6291                .unwrap();
6292
6293            let result = session.execute("SHOW NODE TYPES").unwrap();
6294            assert_eq!(
6295                result.columns,
6296                vec!["name", "properties", "constraints", "parents"]
6297            );
6298            assert_eq!(result.rows.len(), 1);
6299            // First column is the type name
6300            assert_eq!(result.rows[0][0], Value::from("Person"));
6301        }
6302
6303        #[test]
6304        fn test_show_edge_types() {
6305            let db = GrafeoDB::new_in_memory();
6306            let session = db.session();
6307
6308            session
6309                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
6310                .unwrap();
6311
6312            let result = session.execute("SHOW EDGE TYPES").unwrap();
6313            assert_eq!(
6314                result.columns,
6315                vec!["name", "properties", "source_types", "target_types"]
6316            );
6317            assert_eq!(result.rows.len(), 1);
6318            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
6319        }
6320
6321        #[test]
6322        fn test_show_graph_types() {
6323            let db = GrafeoDB::new_in_memory();
6324            let session = db.session();
6325
6326            session
6327                .execute("CREATE NODE TYPE Person (name STRING)")
6328                .unwrap();
6329            session
6330                .execute(
6331                    "CREATE GRAPH TYPE social (\
6332                        NODE TYPE Person (name STRING)\
6333                    )",
6334                )
6335                .unwrap();
6336
6337            let result = session.execute("SHOW GRAPH TYPES").unwrap();
6338            assert_eq!(
6339                result.columns,
6340                vec!["name", "open", "node_types", "edge_types"]
6341            );
6342            assert_eq!(result.rows.len(), 1);
6343            assert_eq!(result.rows[0][0], Value::from("social"));
6344        }
6345
6346        #[test]
6347        fn test_show_graph_type_named() {
6348            let db = GrafeoDB::new_in_memory();
6349            let session = db.session();
6350
6351            session
6352                .execute("CREATE NODE TYPE Person (name STRING)")
6353                .unwrap();
6354            session
6355                .execute(
6356                    "CREATE GRAPH TYPE social (\
6357                        NODE TYPE Person (name STRING)\
6358                    )",
6359                )
6360                .unwrap();
6361
6362            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6363            assert_eq!(result.rows.len(), 1);
6364            assert_eq!(result.rows[0][0], Value::from("social"));
6365        }
6366
6367        #[test]
6368        fn test_show_graph_type_not_found() {
6369            let db = GrafeoDB::new_in_memory();
6370            let session = db.session();
6371
6372            let result = session.execute("SHOW GRAPH TYPE nonexistent");
6373            assert!(result.is_err());
6374        }
6375
6376        #[test]
6377        fn test_show_indexes_via_gql() {
6378            let db = GrafeoDB::new_in_memory();
6379            let session = db.session();
6380
6381            let result = session.execute("SHOW INDEXES").unwrap();
6382            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
6383        }
6384
6385        #[test]
6386        fn test_show_constraints_via_gql() {
6387            let db = GrafeoDB::new_in_memory();
6388            let session = db.session();
6389
6390            let result = session.execute("SHOW CONSTRAINTS").unwrap();
6391            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
6392        }
6393
6394        #[test]
6395        fn test_pattern_form_graph_type_roundtrip() {
6396            let db = GrafeoDB::new_in_memory();
6397            let session = db.session();
6398
6399            // Register the types first
6400            session
6401                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
6402                .unwrap();
6403            session
6404                .execute("CREATE NODE TYPE City (name STRING)")
6405                .unwrap();
6406            session
6407                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
6408                .unwrap();
6409            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
6410
6411            // Create graph type using pattern form
6412            session
6413                .execute(
6414                    "CREATE GRAPH TYPE social (\
6415                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
6416                        (:Person)-[:LIVES_IN]->(:City)\
6417                    )",
6418                )
6419                .unwrap();
6420
6421            // Verify it was created
6422            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
6423            assert_eq!(result.rows.len(), 1);
6424            assert_eq!(result.rows[0][0], Value::from("social"));
6425        }
6426    }
6427}