Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29/// Storage key suffix for the implicit default graph within a schema.
30/// Auto-created by `CREATE SCHEMA` and auto-dropped by `DROP SCHEMA`.
31const SCHEMA_DEFAULT_GRAPH: &str = "__default__";
32
33/// Parses a DDL default-value literal string into a [`Value`].
34///
35/// Handles string literals (single- or double-quoted), integers, floats,
36/// booleans (`true`/`false`), and `NULL`.
37fn parse_default_literal(text: &str) -> Value {
38    if text.eq_ignore_ascii_case("null") {
39        return Value::Null;
40    }
41    if text.eq_ignore_ascii_case("true") {
42        return Value::Bool(true);
43    }
44    if text.eq_ignore_ascii_case("false") {
45        return Value::Bool(false);
46    }
47    // String literal: strip surrounding quotes
48    if (text.starts_with('\'') && text.ends_with('\''))
49        || (text.starts_with('"') && text.ends_with('"'))
50    {
51        return Value::String(text[1..text.len() - 1].into());
52    }
53    // Try integer, then float
54    if let Ok(i) = text.parse::<i64>() {
55        return Value::Int64(i);
56    }
57    if let Ok(f) = text.parse::<f64>() {
58        return Value::Float64(f);
59    }
60    // Fallback: treat as string
61    Value::String(text.into())
62}
63
64/// Runtime configuration for creating a new session.
65///
66/// Groups the shared parameters passed to all session constructors, keeping
67/// call sites readable and avoiding long argument lists.
68pub(crate) struct SessionConfig {
69    pub transaction_manager: Arc<TransactionManager>,
70    pub query_cache: Arc<QueryCache>,
71    pub catalog: Arc<Catalog>,
72    pub adaptive_config: AdaptiveConfig,
73    pub factorized_execution: bool,
74    pub graph_model: GraphModel,
75    pub query_timeout: Option<Duration>,
76    pub commit_counter: Arc<AtomicUsize>,
77    pub gc_interval: usize,
78    /// When true, the session permanently blocks all mutations.
79    pub read_only: bool,
80}
81
82/// Your handle to the database - execute queries and manage transactions.
83///
84/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
85/// tracks its own transaction state, so you can have multiple concurrent
86/// sessions without them interfering.
87pub struct Session {
88    /// The underlying store.
89    store: Arc<LpgStore>,
90    /// Graph store trait object for pluggable storage backends (read path).
91    graph_store: Arc<dyn GraphStore>,
92    /// Writable graph store (None for read-only databases).
93    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
94    /// Schema and metadata catalog shared across sessions.
95    catalog: Arc<Catalog>,
96    /// RDF triple store (if RDF feature is enabled).
97    #[cfg(feature = "rdf")]
98    rdf_store: Arc<RdfStore>,
99    /// Transaction manager.
100    transaction_manager: Arc<TransactionManager>,
101    /// Query cache shared across sessions.
102    query_cache: Arc<QueryCache>,
103    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
104    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
105    /// from within `execute(&self)`.
106    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
107    /// Whether the current transaction is read-only (blocks mutations).
108    read_only_tx: parking_lot::Mutex<bool>,
109    /// Whether the database itself is read-only (set at open time, never changes).
110    /// When true, `read_only_tx` is always true regardless of transaction flags.
111    db_read_only: bool,
112    /// Whether the session is in auto-commit mode.
113    auto_commit: bool,
114    /// Adaptive execution configuration.
115    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
116    adaptive_config: AdaptiveConfig,
117    /// Whether to use factorized execution for multi-hop queries.
118    factorized_execution: bool,
119    /// The graph data model this session operates on.
120    graph_model: GraphModel,
121    /// Maximum time a query may run before being cancelled.
122    query_timeout: Option<Duration>,
123    /// Shared commit counter for triggering auto-GC.
124    commit_counter: Arc<AtomicUsize>,
125    /// GC every N commits (0 = disabled).
126    gc_interval: usize,
127    /// Node count at the start of the current transaction (for PreparedCommit stats).
128    transaction_start_node_count: AtomicUsize,
129    /// Edge count at the start of the current transaction (for PreparedCommit stats).
130    transaction_start_edge_count: AtomicUsize,
131    /// WAL for logging schema changes.
132    #[cfg(feature = "wal")]
133    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
134    /// Shared WAL graph context tracker for named graph awareness.
135    #[cfg(feature = "wal")]
136    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
137    /// CDC log for change tracking.
138    #[cfg(feature = "cdc")]
139    cdc_log: Arc<crate::cdc::CdcLog>,
140    /// Buffered CDC events for the current transaction.
141    /// Flushed to `cdc_log` on commit, discarded on rollback.
142    #[cfg(feature = "cdc")]
143    cdc_pending_events: Option<Arc<parking_lot::Mutex<Vec<crate::cdc::ChangeEvent>>>>,
144    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
145    current_graph: parking_lot::Mutex<Option<String>>,
146    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
147    /// None = "not set" (uses default schema).
148    current_schema: parking_lot::Mutex<Option<String>>,
149    /// Session time zone override.
150    time_zone: parking_lot::Mutex<Option<String>>,
151    /// Session-level parameters (SET PARAMETER).
152    session_params:
153        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
154    /// Override epoch for time-travel queries (None = use transaction/current epoch).
155    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
156    /// Savepoints within the current transaction.
157    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
158    /// Nesting depth for nested transactions (0 = outermost).
159    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
160    /// releases it, nested `ROLLBACK` rolls back to it.
161    transaction_nesting_depth: parking_lot::Mutex<u32>,
162    /// Named graphs touched during the current transaction (for cross-graph atomicity).
163    /// `None` represents the default graph. Populated at `BEGIN` time and on each
164    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
165    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
166    /// Shared metrics registry (populated when the `metrics` feature is enabled).
167    #[cfg(feature = "metrics")]
168    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
169    /// Transaction start time for duration tracking.
170    #[cfg(feature = "metrics")]
171    tx_start_time: parking_lot::Mutex<Option<Instant>>,
172}
173
174/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
175#[derive(Clone)]
176struct GraphSavepoint {
177    graph_name: Option<String>,
178    next_node_id: u64,
179    next_edge_id: u64,
180    undo_log_position: usize,
181}
182
183/// Savepoint state: name + per-graph snapshots + the graph that was active.
184#[derive(Clone)]
185struct SavepointState {
186    name: String,
187    graph_snapshots: Vec<GraphSavepoint>,
188    /// The graph that was active when the savepoint was created.
189    /// Reserved for future use (e.g., restoring graph context on rollback).
190    #[allow(dead_code)]
191    active_graph: Option<String>,
192    /// CDC event buffer position at savepoint creation.
193    /// On rollback-to-savepoint, the buffer is truncated to this position.
194    #[cfg(feature = "cdc")]
195    cdc_event_position: usize,
196}
197
198impl Session {
199    /// Creates a new session with adaptive execution configuration.
200    #[allow(dead_code)]
201    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
202        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
203        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
204        Self {
205            store,
206            graph_store,
207            graph_store_mut,
208            catalog: cfg.catalog,
209            #[cfg(feature = "rdf")]
210            rdf_store: Arc::new(RdfStore::new()),
211            transaction_manager: cfg.transaction_manager,
212            query_cache: cfg.query_cache,
213            current_transaction: parking_lot::Mutex::new(None),
214            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
215            db_read_only: cfg.read_only,
216            auto_commit: true,
217            adaptive_config: cfg.adaptive_config,
218            factorized_execution: cfg.factorized_execution,
219            graph_model: cfg.graph_model,
220            query_timeout: cfg.query_timeout,
221            commit_counter: cfg.commit_counter,
222            gc_interval: cfg.gc_interval,
223            transaction_start_node_count: AtomicUsize::new(0),
224            transaction_start_edge_count: AtomicUsize::new(0),
225            #[cfg(feature = "wal")]
226            wal: None,
227            #[cfg(feature = "wal")]
228            wal_graph_context: None,
229            #[cfg(feature = "cdc")]
230            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
231            #[cfg(feature = "cdc")]
232            cdc_pending_events: None,
233            current_graph: parking_lot::Mutex::new(None),
234            current_schema: parking_lot::Mutex::new(None),
235            time_zone: parking_lot::Mutex::new(None),
236            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
237            viewing_epoch_override: parking_lot::Mutex::new(None),
238            savepoints: parking_lot::Mutex::new(Vec::new()),
239            transaction_nesting_depth: parking_lot::Mutex::new(0),
240            touched_graphs: parking_lot::Mutex::new(Vec::new()),
241            #[cfg(feature = "metrics")]
242            metrics: None,
243            #[cfg(feature = "metrics")]
244            tx_start_time: parking_lot::Mutex::new(None),
245        }
246    }
247
248    /// Sets the WAL for this session (shared with the database).
249    ///
250    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
251    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
252    #[cfg(feature = "wal")]
253    pub(crate) fn set_wal(
254        &mut self,
255        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
256        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
257    ) {
258        // Wrap the graph store so query-engine mutations are WAL-logged
259        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
260            Arc::clone(&self.store),
261            Arc::clone(&wal),
262            Arc::clone(&wal_graph_context),
263        ));
264        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
265        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
266        self.wal = Some(wal);
267        self.wal_graph_context = Some(wal_graph_context);
268    }
269
270    /// Sets the CDC log for this session (shared with the database).
271    ///
272    /// Wraps the current write store with a `CdcGraphStore` decorator so
273    /// that all session mutations (INSERT, SET, DELETE via query execution)
274    /// buffer CDC events. The buffer is flushed to the `CdcLog` on commit
275    /// and discarded on rollback.
276    #[cfg(feature = "cdc")]
277    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
278        // Wrap the WRITE store only with CdcGraphStore to intercept mutations.
279        // The read store (self.graph_store) is left unchanged for zero read overhead.
280        if let Some(ref write_store) = self.graph_store_mut {
281            let cdc_store = Arc::new(crate::database::cdc_store::CdcGraphStore::new(
282                Arc::clone(write_store),
283                Arc::clone(&cdc_log),
284            ));
285            self.cdc_pending_events = Some(cdc_store.pending_events());
286            self.graph_store_mut = Some(cdc_store as Arc<dyn grafeo_core::graph::GraphStoreMut>);
287        }
288        self.cdc_log = cdc_log;
289    }
290
291    /// Sets the metrics registry for this session (shared with the database).
292    #[cfg(feature = "metrics")]
293    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
294        self.metrics = Some(metrics);
295    }
296
297    /// Creates a session backed by an external graph store.
298    ///
299    /// The external store handles all data operations. Transaction management
300    /// (begin/commit/rollback) is not supported for external stores.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the internal arena allocation fails (out of memory).
305    pub(crate) fn with_external_store(
306        read_store: Arc<dyn GraphStore>,
307        write_store: Option<Arc<dyn GraphStoreMut>>,
308        cfg: SessionConfig,
309    ) -> Result<Self> {
310        Ok(Self {
311            store: Arc::new(LpgStore::new()?),
312            graph_store: read_store,
313            graph_store_mut: write_store,
314            catalog: cfg.catalog,
315            #[cfg(feature = "rdf")]
316            rdf_store: Arc::new(RdfStore::new()),
317            transaction_manager: cfg.transaction_manager,
318            query_cache: cfg.query_cache,
319            current_transaction: parking_lot::Mutex::new(None),
320            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
321            db_read_only: cfg.read_only,
322            auto_commit: true,
323            adaptive_config: cfg.adaptive_config,
324            factorized_execution: cfg.factorized_execution,
325            graph_model: cfg.graph_model,
326            query_timeout: cfg.query_timeout,
327            commit_counter: cfg.commit_counter,
328            gc_interval: cfg.gc_interval,
329            transaction_start_node_count: AtomicUsize::new(0),
330            transaction_start_edge_count: AtomicUsize::new(0),
331            #[cfg(feature = "wal")]
332            wal: None,
333            #[cfg(feature = "wal")]
334            wal_graph_context: None,
335            #[cfg(feature = "cdc")]
336            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
337            #[cfg(feature = "cdc")]
338            cdc_pending_events: None,
339            current_graph: parking_lot::Mutex::new(None),
340            current_schema: parking_lot::Mutex::new(None),
341            time_zone: parking_lot::Mutex::new(None),
342            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
343            viewing_epoch_override: parking_lot::Mutex::new(None),
344            savepoints: parking_lot::Mutex::new(Vec::new()),
345            transaction_nesting_depth: parking_lot::Mutex::new(0),
346            touched_graphs: parking_lot::Mutex::new(Vec::new()),
347            #[cfg(feature = "metrics")]
348            metrics: None,
349            #[cfg(feature = "metrics")]
350            tx_start_time: parking_lot::Mutex::new(None),
351        })
352    }
353
354    /// Returns the graph model this session operates on.
355    #[must_use]
356    pub fn graph_model(&self) -> GraphModel {
357        self.graph_model
358    }
359
360    // === Session State Management ===
361
362    /// Sets the current graph for this session (USE GRAPH).
363    pub fn use_graph(&self, name: &str) {
364        *self.current_graph.lock() = Some(name.to_string());
365    }
366
367    /// Returns the current graph name, if any.
368    #[must_use]
369    pub fn current_graph(&self) -> Option<String> {
370        self.current_graph.lock().clone()
371    }
372
373    /// Sets the current schema for this session (SESSION SET SCHEMA).
374    ///
375    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
376    pub fn set_schema(&self, name: &str) {
377        *self.current_schema.lock() = Some(name.to_string());
378    }
379
380    /// Returns the current schema name, if any.
381    ///
382    /// `None` means "not set", which resolves to the default schema.
383    #[must_use]
384    pub fn current_schema(&self) -> Option<String> {
385        self.current_schema.lock().clone()
386    }
387
388    /// Computes the effective storage key for a graph, accounting for schema context.
389    ///
390    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
391    /// Uses `/` as separator since it is invalid in GQL identifiers.
392    fn effective_graph_key(&self, graph_name: &str) -> String {
393        let schema = self.current_schema.lock().clone();
394        match schema {
395            Some(s) => format!("{s}/{graph_name}"),
396            None => graph_name.to_string(),
397        }
398    }
399
400    /// Computes the effective storage key for a type, accounting for schema context.
401    ///
402    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
403    fn effective_type_key(&self, type_name: &str) -> String {
404        let schema = self.current_schema.lock().clone();
405        match schema {
406            Some(s) => format!("{s}/{type_name}"),
407            None => type_name.to_string(),
408        }
409    }
410
411    /// Returns the effective storage key for the current graph, accounting for schema.
412    ///
413    /// Combines `current_schema` and `current_graph` into a flat lookup key.
414    fn active_graph_storage_key(&self) -> Option<String> {
415        let graph = self.current_graph.lock().clone();
416        let schema = self.current_schema.lock().clone();
417        match (&schema, &graph) {
418            (None, None) => None,
419            (Some(s), None) => Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}")),
420            (None, Some(name)) if name.eq_ignore_ascii_case("default") => None,
421            (Some(s), Some(name)) if name.eq_ignore_ascii_case("default") => {
422                Some(format!("{s}/{SCHEMA_DEFAULT_GRAPH}"))
423            }
424            (None, Some(name)) => Some(name.clone()),
425            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
426        }
427    }
428
429    /// Returns the graph store for the currently active graph.
430    ///
431    /// If `current_graph` is `None` or `"default"`, returns the session's
432    /// default `graph_store` (already WAL-wrapped for the default graph).
433    /// Otherwise looks up the named graph in the root store and wraps it
434    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
435    /// graph context.
436    fn active_store(&self) -> Arc<dyn GraphStore> {
437        let key = self.active_graph_storage_key();
438        match key {
439            None => Arc::clone(&self.graph_store),
440            Some(ref name) => match self.store.graph(name) {
441                Some(named_store) => {
442                    #[cfg(feature = "wal")]
443                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
444                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
445                            named_store,
446                            Arc::clone(wal),
447                            name.clone(),
448                            Arc::clone(ctx),
449                        )) as Arc<dyn GraphStore>;
450                    }
451                    named_store as Arc<dyn GraphStore>
452                }
453                None => Arc::clone(&self.graph_store),
454            },
455        }
456    }
457
458    /// Returns the writable store for the active graph, if available.
459    ///
460    /// Returns `None` for read-only databases. For named graphs, wraps
461    /// the store with WAL logging when durability is enabled.
462    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
463        let key = self.active_graph_storage_key();
464        match key {
465            None => self.graph_store_mut.as_ref().map(Arc::clone),
466            Some(ref name) => match self.store.graph(name) {
467                Some(named_store) => {
468                    let mut store: Arc<dyn GraphStoreMut> = named_store;
469
470                    #[cfg(feature = "wal")]
471                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
472                        store = Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
473                            // WAL needs Arc<LpgStore>, get it fresh
474                            self.store
475                                .graph(name)
476                                .unwrap_or_else(|| Arc::clone(&self.store)),
477                            Arc::clone(wal),
478                            name.clone(),
479                            Arc::clone(ctx),
480                        ));
481                    }
482
483                    #[cfg(feature = "cdc")]
484                    if let Some(ref pending) = self.cdc_pending_events {
485                        store = Arc::new(crate::database::cdc_store::CdcGraphStore::wrap(
486                            store,
487                            Arc::clone(&self.cdc_log),
488                            Arc::clone(pending),
489                        ));
490                    }
491
492                    Some(store)
493                }
494                None => self.graph_store_mut.as_ref().map(Arc::clone),
495            },
496        }
497    }
498
499    /// Returns the concrete `LpgStore` for the currently active graph.
500    ///
501    /// Used by direct CRUD methods that need the concrete store type
502    /// for versioned operations.
503    fn active_lpg_store(&self) -> Arc<LpgStore> {
504        let key = self.active_graph_storage_key();
505        match key {
506            None => Arc::clone(&self.store),
507            Some(ref name) => self
508                .store
509                .graph(name)
510                .unwrap_or_else(|| Arc::clone(&self.store)),
511        }
512    }
513
514    /// Resolves a graph name to a concrete `LpgStore`.
515    /// `None` and `"default"` resolve to the session's root store.
516    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
517        match graph_name {
518            None => Arc::clone(&self.store),
519            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
520            Some(name) => self
521                .store
522                .graph(name)
523                .unwrap_or_else(|| Arc::clone(&self.store)),
524        }
525    }
526
527    /// Records the current graph as "touched" if a transaction is active.
528    ///
529    /// Uses the full storage key (schema/graph) so that commit/rollback
530    /// can resolve the correct store via `resolve_store`.
531    fn track_graph_touch(&self) {
532        if self.current_transaction.lock().is_some() {
533            let key = self.active_graph_storage_key();
534            let mut touched = self.touched_graphs.lock();
535            if !touched.contains(&key) {
536                touched.push(key);
537            }
538        }
539    }
540
541    /// Sets the session time zone.
542    pub fn set_time_zone(&self, tz: &str) {
543        *self.time_zone.lock() = Some(tz.to_string());
544    }
545
546    /// Returns the session time zone, if set.
547    #[must_use]
548    pub fn time_zone(&self) -> Option<String> {
549        self.time_zone.lock().clone()
550    }
551
552    /// Sets a session parameter.
553    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
554        self.session_params.lock().insert(key.to_string(), value);
555    }
556
557    /// Gets a session parameter by cloning it.
558    #[must_use]
559    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
560        self.session_params.lock().get(key).cloned()
561    }
562
563    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
564    pub fn reset_session(&self) {
565        *self.current_schema.lock() = None;
566        *self.current_graph.lock() = None;
567        *self.time_zone.lock() = None;
568        self.session_params.lock().clear();
569        *self.viewing_epoch_override.lock() = None;
570    }
571
572    /// Resets only the session schema (Section 7.2 GR1).
573    pub fn reset_schema(&self) {
574        *self.current_schema.lock() = None;
575    }
576
577    /// Resets only the session graph (Section 7.2 GR2).
578    pub fn reset_graph(&self) {
579        *self.current_graph.lock() = None;
580    }
581
582    /// Resets only the session time zone (Section 7.2 GR3).
583    pub fn reset_time_zone(&self) {
584        *self.time_zone.lock() = None;
585    }
586
587    /// Resets only session parameters (Section 7.2 GR4).
588    pub fn reset_parameters(&self) {
589        self.session_params.lock().clear();
590    }
591
592    // --- Time-travel API ---
593
594    /// Sets a viewing epoch override for time-travel queries.
595    ///
596    /// While set, all queries on this session see the database as it existed
597    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
598    /// to return to normal behavior.
599    pub fn set_viewing_epoch(&self, epoch: EpochId) {
600        *self.viewing_epoch_override.lock() = Some(epoch);
601    }
602
603    /// Clears the viewing epoch override, returning to normal behavior.
604    pub fn clear_viewing_epoch(&self) {
605        *self.viewing_epoch_override.lock() = None;
606    }
607
608    /// Returns the current viewing epoch override, if any.
609    #[must_use]
610    pub fn viewing_epoch(&self) -> Option<EpochId> {
611        *self.viewing_epoch_override.lock()
612    }
613
614    /// Returns all versions of a node with their creation/deletion epochs.
615    ///
616    /// Properties and labels reflect the current state (not versioned per-epoch).
617    #[must_use]
618    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
619        self.active_lpg_store().get_node_history(id)
620    }
621
622    /// Returns all versions of an edge with their creation/deletion epochs.
623    ///
624    /// Properties reflect the current state (not versioned per-epoch).
625    #[must_use]
626    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
627        self.active_lpg_store().get_edge_history(id)
628    }
629
630    /// Checks that the session's graph model supports LPG operations.
631    fn require_lpg(&self, language: &str) -> Result<()> {
632        if self.graph_model == GraphModel::Rdf {
633            return Err(grafeo_common::utils::error::Error::Internal(format!(
634                "This is an RDF database. {language} queries require an LPG database."
635            )));
636        }
637        Ok(())
638    }
639
640    /// Executes a session or transaction command, returning an empty result.
641    #[cfg(feature = "gql")]
642    fn execute_session_command(
643        &self,
644        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
645    ) -> Result<QueryResult> {
646        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
647        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
648
649        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
650        if *self.read_only_tx.lock() {
651            match &cmd {
652                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
653                    return Err(Error::Transaction(
654                        grafeo_common::utils::error::TransactionError::ReadOnly,
655                    ));
656                }
657                _ => {} // Session state + transaction control allowed
658            }
659        }
660
661        match cmd {
662            SessionCommand::CreateGraph {
663                name,
664                if_not_exists,
665                typed,
666                like_graph,
667                copy_of,
668                open: _,
669            } => {
670                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
671                let storage_key = self.effective_graph_key(&name);
672
673                // Validate source graph exists for LIKE / AS COPY OF
674                if let Some(ref src) = like_graph {
675                    let src_key = self.effective_graph_key(src);
676                    if self.store.graph(&src_key).is_none() {
677                        return Err(Error::Query(QueryError::new(
678                            QueryErrorKind::Semantic,
679                            format!("Source graph '{src}' does not exist"),
680                        )));
681                    }
682                }
683                if let Some(ref src) = copy_of {
684                    let src_key = self.effective_graph_key(src);
685                    if self.store.graph(&src_key).is_none() {
686                        return Err(Error::Query(QueryError::new(
687                            QueryErrorKind::Semantic,
688                            format!("Source graph '{src}' does not exist"),
689                        )));
690                    }
691                }
692
693                let created = self
694                    .store
695                    .create_graph(&storage_key)
696                    .map_err(|e| Error::Internal(e.to_string()))?;
697                if !created && !if_not_exists {
698                    return Err(Error::Query(QueryError::new(
699                        QueryErrorKind::Semantic,
700                        format!("Graph '{name}' already exists"),
701                    )));
702                }
703                if created {
704                    #[cfg(feature = "wal")]
705                    self.log_schema_wal(
706                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
707                            name: storage_key.clone(),
708                        },
709                    );
710                }
711
712                // AS COPY OF: copy data from source graph
713                if let Some(ref src) = copy_of {
714                    let src_key = self.effective_graph_key(src);
715                    self.store
716                        .copy_graph(Some(&src_key), Some(&storage_key))
717                        .map_err(|e| Error::Internal(e.to_string()))?;
718                }
719
720                // Bind to graph type if specified.
721                // If the parser produced a '/' in the name it is already a qualified
722                // "schema/type" key; otherwise resolve against the current schema.
723                if let Some(type_name) = typed
724                    && let Err(e) = self.catalog.bind_graph_type(
725                        &storage_key,
726                        if type_name.contains('/') {
727                            type_name.clone()
728                        } else {
729                            self.effective_type_key(&type_name)
730                        },
731                    )
732                {
733                    return Err(Error::Query(QueryError::new(
734                        QueryErrorKind::Semantic,
735                        e.to_string(),
736                    )));
737                }
738
739                // LIKE: copy graph type binding from source
740                if let Some(ref src) = like_graph {
741                    let src_key = self.effective_graph_key(src);
742                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
743                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
744                    }
745                }
746
747                Ok(QueryResult::empty())
748            }
749            SessionCommand::DropGraph { name, if_exists } => {
750                let storage_key = self.effective_graph_key(&name);
751                let dropped = self.store.drop_graph(&storage_key);
752                if !dropped && !if_exists {
753                    return Err(Error::Query(QueryError::new(
754                        QueryErrorKind::Semantic,
755                        format!("Graph '{name}' does not exist"),
756                    )));
757                }
758                if dropped {
759                    #[cfg(feature = "wal")]
760                    self.log_schema_wal(
761                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
762                            name: storage_key.clone(),
763                        },
764                    );
765                    // If this session was using the dropped graph, reset to default
766                    let mut current = self.current_graph.lock();
767                    if current
768                        .as_deref()
769                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
770                    {
771                        *current = None;
772                    }
773                }
774                Ok(QueryResult::empty())
775            }
776            SessionCommand::UseGraph(name) => {
777                // Verify graph exists (resolve within current schema)
778                let effective_key = self.effective_graph_key(&name);
779                if !name.eq_ignore_ascii_case("default")
780                    && self.store.graph(&effective_key).is_none()
781                {
782                    return Err(Error::Query(QueryError::new(
783                        QueryErrorKind::Semantic,
784                        format!("Graph '{name}' does not exist"),
785                    )));
786                }
787                self.use_graph(&name);
788                // Track the new graph if in a transaction
789                self.track_graph_touch();
790                Ok(QueryResult::empty())
791            }
792            SessionCommand::SessionSetGraph(name) => {
793                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
794                let effective_key = self.effective_graph_key(&name);
795                if !name.eq_ignore_ascii_case("default")
796                    && self.store.graph(&effective_key).is_none()
797                {
798                    return Err(Error::Query(QueryError::new(
799                        QueryErrorKind::Semantic,
800                        format!("Graph '{name}' does not exist"),
801                    )));
802                }
803                self.use_graph(&name);
804                // Track the new graph if in a transaction
805                self.track_graph_touch();
806                Ok(QueryResult::empty())
807            }
808            SessionCommand::SessionSetSchema(name) => {
809                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
810                if !self.catalog.schema_exists(&name) {
811                    return Err(Error::Query(QueryError::new(
812                        QueryErrorKind::Semantic,
813                        format!("Schema '{name}' does not exist"),
814                    )));
815                }
816                self.set_schema(&name);
817                Ok(QueryResult::empty())
818            }
819            SessionCommand::SessionSetTimeZone(tz) => {
820                self.set_time_zone(&tz);
821                Ok(QueryResult::empty())
822            }
823            SessionCommand::SessionSetParameter(key, expr) => {
824                if key.eq_ignore_ascii_case("viewing_epoch") {
825                    match Self::eval_integer_literal(&expr) {
826                        Some(n) if n >= 0 => {
827                            self.set_viewing_epoch(EpochId::new(n as u64));
828                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
829                        }
830                        _ => Err(Error::Query(QueryError::new(
831                            QueryErrorKind::Semantic,
832                            "viewing_epoch must be a non-negative integer literal",
833                        ))),
834                    }
835                } else {
836                    // For now, store parameter name with Null value.
837                    // Full expression evaluation would require building and executing a plan.
838                    self.set_parameter(&key, Value::Null);
839                    Ok(QueryResult::empty())
840                }
841            }
842            SessionCommand::SessionReset(target) => {
843                use grafeo_adapters::query::gql::ast::SessionResetTarget;
844                match target {
845                    SessionResetTarget::All => self.reset_session(),
846                    SessionResetTarget::Schema => self.reset_schema(),
847                    SessionResetTarget::Graph => self.reset_graph(),
848                    SessionResetTarget::TimeZone => self.reset_time_zone(),
849                    SessionResetTarget::Parameters => self.reset_parameters(),
850                }
851                Ok(QueryResult::empty())
852            }
853            SessionCommand::SessionClose => {
854                self.reset_session();
855                Ok(QueryResult::empty())
856            }
857            SessionCommand::StartTransaction {
858                read_only,
859                isolation_level,
860            } => {
861                let engine_level = isolation_level.map(|l| match l {
862                    TransactionIsolationLevel::ReadCommitted => {
863                        crate::transaction::IsolationLevel::ReadCommitted
864                    }
865                    TransactionIsolationLevel::SnapshotIsolation => {
866                        crate::transaction::IsolationLevel::SnapshotIsolation
867                    }
868                    TransactionIsolationLevel::Serializable => {
869                        crate::transaction::IsolationLevel::Serializable
870                    }
871                });
872                self.begin_transaction_inner(read_only, engine_level)?;
873                Ok(QueryResult::status("Transaction started"))
874            }
875            SessionCommand::Commit => {
876                self.commit_inner()?;
877                Ok(QueryResult::status("Transaction committed"))
878            }
879            SessionCommand::Rollback => {
880                self.rollback_inner()?;
881                Ok(QueryResult::status("Transaction rolled back"))
882            }
883            SessionCommand::Savepoint(name) => {
884                self.savepoint(&name)?;
885                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
886            }
887            SessionCommand::RollbackToSavepoint(name) => {
888                self.rollback_to_savepoint(&name)?;
889                Ok(QueryResult::status(format!(
890                    "Rolled back to savepoint '{name}'"
891                )))
892            }
893            SessionCommand::ReleaseSavepoint(name) => {
894                self.release_savepoint(&name)?;
895                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
896            }
897        }
898    }
899
900    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
901    #[cfg(feature = "wal")]
902    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
903        if let Some(ref wal) = self.wal
904            && let Err(e) = wal.log(record)
905        {
906            grafeo_warn!("Failed to log schema change to WAL: {}", e);
907        }
908    }
909
910    /// Executes a schema DDL command, returning a status result.
911    #[cfg(feature = "gql")]
912    fn execute_schema_command(
913        &self,
914        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
915    ) -> Result<QueryResult> {
916        use crate::catalog::{
917            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
918        };
919        use grafeo_adapters::query::gql::ast::SchemaStatement;
920        #[cfg(feature = "wal")]
921        use grafeo_adapters::storage::wal::WalRecord;
922        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
923
924        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
925        macro_rules! wal_log {
926            ($self:expr, $record:expr) => {
927                #[cfg(feature = "wal")]
928                $self.log_schema_wal(&$record);
929            };
930        }
931
932        let result = match cmd {
933            SchemaStatement::CreateNodeType(stmt) => {
934                let effective_name = self.effective_type_key(&stmt.name);
935                #[cfg(feature = "wal")]
936                let props_for_wal: Vec<(String, String, bool)> = stmt
937                    .properties
938                    .iter()
939                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
940                    .collect();
941                let def = NodeTypeDefinition {
942                    name: effective_name.clone(),
943                    properties: stmt
944                        .properties
945                        .iter()
946                        .map(|p| TypedProperty {
947                            name: p.name.clone(),
948                            data_type: PropertyDataType::from_type_name(&p.data_type),
949                            nullable: p.nullable,
950                            default_value: p
951                                .default_value
952                                .as_ref()
953                                .map(|s| parse_default_literal(s)),
954                        })
955                        .collect(),
956                    constraints: Vec::new(),
957                    parent_types: stmt.parent_types.clone(),
958                };
959                let result = if stmt.or_replace {
960                    let _ = self.catalog.drop_node_type(&effective_name);
961                    self.catalog.register_node_type(def)
962                } else {
963                    self.catalog.register_node_type(def)
964                };
965                match result {
966                    Ok(()) => {
967                        wal_log!(
968                            self,
969                            WalRecord::CreateNodeType {
970                                name: effective_name.clone(),
971                                properties: props_for_wal,
972                                constraints: Vec::new(),
973                            }
974                        );
975                        Ok(QueryResult::status(format!(
976                            "Created node type '{}'",
977                            stmt.name
978                        )))
979                    }
980                    Err(e) if stmt.if_not_exists => {
981                        let _ = e;
982                        Ok(QueryResult::status("No change"))
983                    }
984                    Err(e) => Err(Error::Query(QueryError::new(
985                        QueryErrorKind::Semantic,
986                        e.to_string(),
987                    ))),
988                }
989            }
990            SchemaStatement::CreateEdgeType(stmt) => {
991                let effective_name = self.effective_type_key(&stmt.name);
992                #[cfg(feature = "wal")]
993                let props_for_wal: Vec<(String, String, bool)> = stmt
994                    .properties
995                    .iter()
996                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
997                    .collect();
998                let def = EdgeTypeDefinition {
999                    name: effective_name.clone(),
1000                    properties: stmt
1001                        .properties
1002                        .iter()
1003                        .map(|p| TypedProperty {
1004                            name: p.name.clone(),
1005                            data_type: PropertyDataType::from_type_name(&p.data_type),
1006                            nullable: p.nullable,
1007                            default_value: p
1008                                .default_value
1009                                .as_ref()
1010                                .map(|s| parse_default_literal(s)),
1011                        })
1012                        .collect(),
1013                    constraints: Vec::new(),
1014                    source_node_types: stmt.source_node_types.clone(),
1015                    target_node_types: stmt.target_node_types.clone(),
1016                };
1017                let result = if stmt.or_replace {
1018                    let _ = self.catalog.drop_edge_type_def(&effective_name);
1019                    self.catalog.register_edge_type_def(def)
1020                } else {
1021                    self.catalog.register_edge_type_def(def)
1022                };
1023                match result {
1024                    Ok(()) => {
1025                        wal_log!(
1026                            self,
1027                            WalRecord::CreateEdgeType {
1028                                name: effective_name.clone(),
1029                                properties: props_for_wal,
1030                                constraints: Vec::new(),
1031                            }
1032                        );
1033                        Ok(QueryResult::status(format!(
1034                            "Created edge type '{}'",
1035                            stmt.name
1036                        )))
1037                    }
1038                    Err(e) if stmt.if_not_exists => {
1039                        let _ = e;
1040                        Ok(QueryResult::status("No change"))
1041                    }
1042                    Err(e) => Err(Error::Query(QueryError::new(
1043                        QueryErrorKind::Semantic,
1044                        e.to_string(),
1045                    ))),
1046                }
1047            }
1048            SchemaStatement::CreateVectorIndex(stmt) => {
1049                Self::create_vector_index_on_store(
1050                    &self.active_lpg_store(),
1051                    &stmt.node_label,
1052                    &stmt.property,
1053                    stmt.dimensions,
1054                    stmt.metric.as_deref(),
1055                )?;
1056                wal_log!(
1057                    self,
1058                    WalRecord::CreateIndex {
1059                        name: stmt.name.clone(),
1060                        label: stmt.node_label.clone(),
1061                        property: stmt.property.clone(),
1062                        index_type: "vector".to_string(),
1063                    }
1064                );
1065                Ok(QueryResult::status(format!(
1066                    "Created vector index '{}'",
1067                    stmt.name
1068                )))
1069            }
1070            SchemaStatement::DropNodeType { name, if_exists } => {
1071                let effective_name = self.effective_type_key(&name);
1072                match self.catalog.drop_node_type(&effective_name) {
1073                    Ok(()) => {
1074                        wal_log!(
1075                            self,
1076                            WalRecord::DropNodeType {
1077                                name: effective_name
1078                            }
1079                        );
1080                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1081                    }
1082                    Err(e) if if_exists => {
1083                        let _ = e;
1084                        Ok(QueryResult::status("No change"))
1085                    }
1086                    Err(e) => Err(Error::Query(QueryError::new(
1087                        QueryErrorKind::Semantic,
1088                        e.to_string(),
1089                    ))),
1090                }
1091            }
1092            SchemaStatement::DropEdgeType { name, if_exists } => {
1093                let effective_name = self.effective_type_key(&name);
1094                match self.catalog.drop_edge_type_def(&effective_name) {
1095                    Ok(()) => {
1096                        wal_log!(
1097                            self,
1098                            WalRecord::DropEdgeType {
1099                                name: effective_name
1100                            }
1101                        );
1102                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1103                    }
1104                    Err(e) if if_exists => {
1105                        let _ = e;
1106                        Ok(QueryResult::status("No change"))
1107                    }
1108                    Err(e) => Err(Error::Query(QueryError::new(
1109                        QueryErrorKind::Semantic,
1110                        e.to_string(),
1111                    ))),
1112                }
1113            }
1114            SchemaStatement::CreateIndex(stmt) => {
1115                use crate::catalog::IndexType as CatalogIndexType;
1116                use grafeo_adapters::query::gql::ast::IndexKind;
1117                let active = self.active_lpg_store();
1118                let index_type_str = match stmt.index_kind {
1119                    IndexKind::Property => "property",
1120                    IndexKind::BTree => "btree",
1121                    IndexKind::Text => "text",
1122                    IndexKind::Vector => "vector",
1123                };
1124                match stmt.index_kind {
1125                    IndexKind::Property | IndexKind::BTree => {
1126                        for prop in &stmt.properties {
1127                            active.create_property_index(prop);
1128                        }
1129                    }
1130                    IndexKind::Text => {
1131                        for prop in &stmt.properties {
1132                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1133                        }
1134                    }
1135                    IndexKind::Vector => {
1136                        for prop in &stmt.properties {
1137                            Self::create_vector_index_on_store(
1138                                &active,
1139                                &stmt.label,
1140                                prop,
1141                                stmt.options.dimensions,
1142                                stmt.options.metric.as_deref(),
1143                            )?;
1144                        }
1145                    }
1146                }
1147                // Register each property index in the catalog for SHOW INDEXES
1148                // and DROP INDEX name-based lookup.
1149                let catalog_index_type = match stmt.index_kind {
1150                    IndexKind::Property => CatalogIndexType::Hash,
1151                    IndexKind::BTree => CatalogIndexType::BTree,
1152                    IndexKind::Text => CatalogIndexType::FullText,
1153                    IndexKind::Vector => CatalogIndexType::Hash,
1154                };
1155                let label_id = self.catalog.get_or_create_label(&stmt.label);
1156                for prop in &stmt.properties {
1157                    let prop_id = self.catalog.get_or_create_property_key(prop);
1158                    self.catalog
1159                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1160                }
1161                #[cfg(feature = "wal")]
1162                for prop in &stmt.properties {
1163                    wal_log!(
1164                        self,
1165                        WalRecord::CreateIndex {
1166                            name: stmt.name.clone(),
1167                            label: stmt.label.clone(),
1168                            property: prop.clone(),
1169                            index_type: index_type_str.to_string(),
1170                        }
1171                    );
1172                }
1173                Ok(QueryResult::status(format!(
1174                    "Created {} index '{}'",
1175                    index_type_str, stmt.name
1176                )))
1177            }
1178            SchemaStatement::DropIndex { name, if_exists } => {
1179                // Look up the index by name in the catalog to find the
1180                // underlying property, then drop from both catalog and store.
1181                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1182                    let def = self.catalog.get_index(index_id);
1183                    self.catalog.drop_index(index_id);
1184                    if let Some(def) = def
1185                        && let Some(prop_name) =
1186                            self.catalog.get_property_key_name(def.property_key)
1187                    {
1188                        self.active_lpg_store().drop_property_index(&prop_name);
1189                    }
1190                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1191                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1192                } else if if_exists {
1193                    Ok(QueryResult::status("No change".to_string()))
1194                } else {
1195                    Err(Error::Query(QueryError::new(
1196                        QueryErrorKind::Semantic,
1197                        format!("Index '{name}' does not exist"),
1198                    )))
1199                }
1200            }
1201            SchemaStatement::CreateConstraint(stmt) => {
1202                use crate::catalog::TypeConstraint;
1203                use grafeo_adapters::query::gql::ast::ConstraintKind;
1204                let kind_str = match stmt.constraint_kind {
1205                    ConstraintKind::Unique => "unique",
1206                    ConstraintKind::NodeKey => "node_key",
1207                    ConstraintKind::NotNull => "not_null",
1208                    ConstraintKind::Exists => "exists",
1209                };
1210                let constraint_name = stmt
1211                    .name
1212                    .clone()
1213                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1214
1215                // Register constraint in catalog type definitions
1216                match stmt.constraint_kind {
1217                    ConstraintKind::Unique => {
1218                        for prop in &stmt.properties {
1219                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1220                            let prop_id = self.catalog.get_or_create_property_key(prop);
1221                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1222                        }
1223                        let _ = self.catalog.add_constraint_to_type(
1224                            &stmt.label,
1225                            TypeConstraint::Unique(stmt.properties.clone()),
1226                        );
1227                    }
1228                    ConstraintKind::NodeKey => {
1229                        for prop in &stmt.properties {
1230                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1231                            let prop_id = self.catalog.get_or_create_property_key(prop);
1232                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1233                            let _ = self.catalog.add_required_property(label_id, prop_id);
1234                        }
1235                        let _ = self.catalog.add_constraint_to_type(
1236                            &stmt.label,
1237                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1238                        );
1239                    }
1240                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1241                        for prop in &stmt.properties {
1242                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1243                            let prop_id = self.catalog.get_or_create_property_key(prop);
1244                            let _ = self.catalog.add_required_property(label_id, prop_id);
1245                            let _ = self.catalog.add_constraint_to_type(
1246                                &stmt.label,
1247                                TypeConstraint::NotNull(prop.clone()),
1248                            );
1249                        }
1250                    }
1251                }
1252
1253                wal_log!(
1254                    self,
1255                    WalRecord::CreateConstraint {
1256                        name: constraint_name.clone(),
1257                        label: stmt.label.clone(),
1258                        properties: stmt.properties.clone(),
1259                        kind: kind_str.to_string(),
1260                    }
1261                );
1262                Ok(QueryResult::status(format!(
1263                    "Created {kind_str} constraint '{constraint_name}'"
1264                )))
1265            }
1266            SchemaStatement::DropConstraint { name, if_exists } => {
1267                let _ = if_exists;
1268                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1269                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1270            }
1271            SchemaStatement::CreateGraphType(stmt) => {
1272                use crate::catalog::GraphTypeDefinition;
1273                use grafeo_adapters::query::gql::ast::InlineElementType;
1274
1275                let effective_name = self.effective_type_key(&stmt.name);
1276
1277                // GG04: LIKE clause copies type from existing graph
1278                let (mut node_types, mut edge_types, open) =
1279                    if let Some(ref like_graph) = stmt.like_graph {
1280                        // Infer types from the graph's bound type, or use its existing types
1281                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1282                            if let Some(existing) = self
1283                                .catalog
1284                                .schema()
1285                                .and_then(|s| s.get_graph_type(&type_name))
1286                            {
1287                                (
1288                                    existing.allowed_node_types.clone(),
1289                                    existing.allowed_edge_types.clone(),
1290                                    existing.open,
1291                                )
1292                            } else {
1293                                (Vec::new(), Vec::new(), true)
1294                            }
1295                        } else {
1296                            // GG22: Infer from graph data (labels used in graph)
1297                            let nt = self.catalog.all_node_type_names();
1298                            let et = self.catalog.all_edge_type_names();
1299                            if nt.is_empty() && et.is_empty() {
1300                                (Vec::new(), Vec::new(), true)
1301                            } else {
1302                                (nt, et, false)
1303                            }
1304                        }
1305                    } else {
1306                        // Prefix element type names with schema for consistency
1307                        let nt = stmt
1308                            .node_types
1309                            .iter()
1310                            .map(|n| self.effective_type_key(n))
1311                            .collect();
1312                        let et = stmt
1313                            .edge_types
1314                            .iter()
1315                            .map(|n| self.effective_type_key(n))
1316                            .collect();
1317                        (nt, et, stmt.open)
1318                    };
1319
1320                // GG03: Register inline element types and add their names
1321                for inline in &stmt.inline_types {
1322                    match inline {
1323                        InlineElementType::Node {
1324                            name,
1325                            properties,
1326                            key_labels,
1327                            ..
1328                        } => {
1329                            let inline_effective = self.effective_type_key(name);
1330                            let def = NodeTypeDefinition {
1331                                name: inline_effective.clone(),
1332                                properties: properties
1333                                    .iter()
1334                                    .map(|p| TypedProperty {
1335                                        name: p.name.clone(),
1336                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1337                                        nullable: p.nullable,
1338                                        default_value: None,
1339                                    })
1340                                    .collect(),
1341                                constraints: Vec::new(),
1342                                parent_types: key_labels.clone(),
1343                            };
1344                            // Register or replace so inline defs override existing
1345                            self.catalog.register_or_replace_node_type(def);
1346                            #[cfg(feature = "wal")]
1347                            {
1348                                let props_for_wal: Vec<(String, String, bool)> = properties
1349                                    .iter()
1350                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1351                                    .collect();
1352                                self.log_schema_wal(&WalRecord::CreateNodeType {
1353                                    name: inline_effective.clone(),
1354                                    properties: props_for_wal,
1355                                    constraints: Vec::new(),
1356                                });
1357                            }
1358                            if !node_types.contains(&inline_effective) {
1359                                node_types.push(inline_effective);
1360                            }
1361                        }
1362                        InlineElementType::Edge {
1363                            name,
1364                            properties,
1365                            source_node_types,
1366                            target_node_types,
1367                            ..
1368                        } => {
1369                            let inline_effective = self.effective_type_key(name);
1370                            let def = EdgeTypeDefinition {
1371                                name: inline_effective.clone(),
1372                                properties: properties
1373                                    .iter()
1374                                    .map(|p| TypedProperty {
1375                                        name: p.name.clone(),
1376                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1377                                        nullable: p.nullable,
1378                                        default_value: None,
1379                                    })
1380                                    .collect(),
1381                                constraints: Vec::new(),
1382                                source_node_types: source_node_types.clone(),
1383                                target_node_types: target_node_types.clone(),
1384                            };
1385                            self.catalog.register_or_replace_edge_type_def(def);
1386                            #[cfg(feature = "wal")]
1387                            {
1388                                let props_for_wal: Vec<(String, String, bool)> = properties
1389                                    .iter()
1390                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1391                                    .collect();
1392                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1393                                    name: inline_effective.clone(),
1394                                    properties: props_for_wal,
1395                                    constraints: Vec::new(),
1396                                });
1397                            }
1398                            if !edge_types.contains(&inline_effective) {
1399                                edge_types.push(inline_effective);
1400                            }
1401                        }
1402                    }
1403                }
1404
1405                let def = GraphTypeDefinition {
1406                    name: effective_name.clone(),
1407                    allowed_node_types: node_types.clone(),
1408                    allowed_edge_types: edge_types.clone(),
1409                    open,
1410                };
1411                let result = if stmt.or_replace {
1412                    // Drop existing first, ignore error if not found
1413                    let _ = self.catalog.drop_graph_type(&effective_name);
1414                    self.catalog.register_graph_type(def)
1415                } else {
1416                    self.catalog.register_graph_type(def)
1417                };
1418                match result {
1419                    Ok(()) => {
1420                        wal_log!(
1421                            self,
1422                            WalRecord::CreateGraphType {
1423                                name: effective_name.clone(),
1424                                node_types,
1425                                edge_types,
1426                                open,
1427                            }
1428                        );
1429                        Ok(QueryResult::status(format!(
1430                            "Created graph type '{}'",
1431                            stmt.name
1432                        )))
1433                    }
1434                    Err(e) if stmt.if_not_exists => {
1435                        let _ = e;
1436                        Ok(QueryResult::status("No change"))
1437                    }
1438                    Err(e) => Err(Error::Query(QueryError::new(
1439                        QueryErrorKind::Semantic,
1440                        e.to_string(),
1441                    ))),
1442                }
1443            }
1444            SchemaStatement::DropGraphType { name, if_exists } => {
1445                let effective_name = self.effective_type_key(&name);
1446                match self.catalog.drop_graph_type(&effective_name) {
1447                    Ok(()) => {
1448                        wal_log!(
1449                            self,
1450                            WalRecord::DropGraphType {
1451                                name: effective_name
1452                            }
1453                        );
1454                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1455                    }
1456                    Err(e) if if_exists => {
1457                        let _ = e;
1458                        Ok(QueryResult::status("No change"))
1459                    }
1460                    Err(e) => Err(Error::Query(QueryError::new(
1461                        QueryErrorKind::Semantic,
1462                        e.to_string(),
1463                    ))),
1464                }
1465            }
1466            SchemaStatement::CreateSchema {
1467                name,
1468                if_not_exists,
1469            } => match self.catalog.register_schema_namespace(name.clone()) {
1470                Ok(()) => {
1471                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1472                    // Auto-create the schema's default graph partition so that
1473                    // SESSION SET SCHEMA + queries work without an explicit graph.
1474                    let default_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1475                    if self.store.create_graph(&default_key).unwrap_or(false) {
1476                        wal_log!(self, WalRecord::CreateNamedGraph { name: default_key });
1477                    }
1478                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1479                }
1480                Err(e) if if_not_exists => {
1481                    let _ = e;
1482                    Ok(QueryResult::status("No change"))
1483                }
1484                Err(e) => Err(Error::Query(QueryError::new(
1485                    QueryErrorKind::Semantic,
1486                    e.to_string(),
1487                ))),
1488            },
1489            SchemaStatement::DropSchema { name, if_exists } => {
1490                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping.
1491                // The auto-created __default__ graph is exempt from the check.
1492                let prefix = format!("{name}/");
1493                let default_graph_key = format!("{name}/{SCHEMA_DEFAULT_GRAPH}");
1494                let has_graphs = self
1495                    .store
1496                    .graph_names()
1497                    .iter()
1498                    .any(|g| g.starts_with(&prefix) && *g != default_graph_key);
1499                let has_types = self
1500                    .catalog
1501                    .all_node_type_names()
1502                    .iter()
1503                    .any(|n| n.starts_with(&prefix))
1504                    || self
1505                        .catalog
1506                        .all_edge_type_names()
1507                        .iter()
1508                        .any(|n| n.starts_with(&prefix))
1509                    || self
1510                        .catalog
1511                        .all_graph_type_names()
1512                        .iter()
1513                        .any(|n| n.starts_with(&prefix));
1514                if has_graphs || has_types {
1515                    return Err(Error::Query(QueryError::new(
1516                        QueryErrorKind::Semantic,
1517                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1518                    )));
1519                }
1520                match self.catalog.drop_schema_namespace(&name) {
1521                    Ok(()) => {
1522                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1523                        // Drop the auto-created default graph partition
1524                        if self.store.drop_graph(&default_graph_key) {
1525                            wal_log!(
1526                                self,
1527                                WalRecord::DropNamedGraph {
1528                                    name: default_graph_key,
1529                                }
1530                            );
1531                        }
1532                        // If this session was using the dropped schema, reset it
1533                        let mut current = self.current_schema.lock();
1534                        if current
1535                            .as_deref()
1536                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1537                        {
1538                            *current = None;
1539                        }
1540                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1541                    }
1542                    Err(e) if if_exists => {
1543                        let _ = e;
1544                        Ok(QueryResult::status("No change"))
1545                    }
1546                    Err(e) => Err(Error::Query(QueryError::new(
1547                        QueryErrorKind::Semantic,
1548                        e.to_string(),
1549                    ))),
1550                }
1551            }
1552            SchemaStatement::AlterNodeType(stmt) => {
1553                use grafeo_adapters::query::gql::ast::TypeAlteration;
1554                let effective_name = self.effective_type_key(&stmt.name);
1555                let mut wal_alts = Vec::new();
1556                for alt in &stmt.alterations {
1557                    match alt {
1558                        TypeAlteration::AddProperty(prop) => {
1559                            let typed = TypedProperty {
1560                                name: prop.name.clone(),
1561                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1562                                nullable: prop.nullable,
1563                                default_value: prop
1564                                    .default_value
1565                                    .as_ref()
1566                                    .map(|s| parse_default_literal(s)),
1567                            };
1568                            self.catalog
1569                                .alter_node_type_add_property(&effective_name, typed)
1570                                .map_err(|e| {
1571                                    Error::Query(QueryError::new(
1572                                        QueryErrorKind::Semantic,
1573                                        e.to_string(),
1574                                    ))
1575                                })?;
1576                            wal_alts.push((
1577                                "add".to_string(),
1578                                prop.name.clone(),
1579                                prop.data_type.clone(),
1580                                prop.nullable,
1581                            ));
1582                        }
1583                        TypeAlteration::DropProperty(name) => {
1584                            self.catalog
1585                                .alter_node_type_drop_property(&effective_name, name)
1586                                .map_err(|e| {
1587                                    Error::Query(QueryError::new(
1588                                        QueryErrorKind::Semantic,
1589                                        e.to_string(),
1590                                    ))
1591                                })?;
1592                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1593                        }
1594                    }
1595                }
1596                wal_log!(
1597                    self,
1598                    WalRecord::AlterNodeType {
1599                        name: effective_name,
1600                        alterations: wal_alts,
1601                    }
1602                );
1603                Ok(QueryResult::status(format!(
1604                    "Altered node type '{}'",
1605                    stmt.name
1606                )))
1607            }
1608            SchemaStatement::AlterEdgeType(stmt) => {
1609                use grafeo_adapters::query::gql::ast::TypeAlteration;
1610                let effective_name = self.effective_type_key(&stmt.name);
1611                let mut wal_alts = Vec::new();
1612                for alt in &stmt.alterations {
1613                    match alt {
1614                        TypeAlteration::AddProperty(prop) => {
1615                            let typed = TypedProperty {
1616                                name: prop.name.clone(),
1617                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1618                                nullable: prop.nullable,
1619                                default_value: prop
1620                                    .default_value
1621                                    .as_ref()
1622                                    .map(|s| parse_default_literal(s)),
1623                            };
1624                            self.catalog
1625                                .alter_edge_type_add_property(&effective_name, typed)
1626                                .map_err(|e| {
1627                                    Error::Query(QueryError::new(
1628                                        QueryErrorKind::Semantic,
1629                                        e.to_string(),
1630                                    ))
1631                                })?;
1632                            wal_alts.push((
1633                                "add".to_string(),
1634                                prop.name.clone(),
1635                                prop.data_type.clone(),
1636                                prop.nullable,
1637                            ));
1638                        }
1639                        TypeAlteration::DropProperty(name) => {
1640                            self.catalog
1641                                .alter_edge_type_drop_property(&effective_name, name)
1642                                .map_err(|e| {
1643                                    Error::Query(QueryError::new(
1644                                        QueryErrorKind::Semantic,
1645                                        e.to_string(),
1646                                    ))
1647                                })?;
1648                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1649                        }
1650                    }
1651                }
1652                wal_log!(
1653                    self,
1654                    WalRecord::AlterEdgeType {
1655                        name: effective_name,
1656                        alterations: wal_alts,
1657                    }
1658                );
1659                Ok(QueryResult::status(format!(
1660                    "Altered edge type '{}'",
1661                    stmt.name
1662                )))
1663            }
1664            SchemaStatement::AlterGraphType(stmt) => {
1665                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1666                let effective_name = self.effective_type_key(&stmt.name);
1667                let mut wal_alts = Vec::new();
1668                for alt in &stmt.alterations {
1669                    match alt {
1670                        GraphTypeAlteration::AddNodeType(name) => {
1671                            self.catalog
1672                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1673                                .map_err(|e| {
1674                                    Error::Query(QueryError::new(
1675                                        QueryErrorKind::Semantic,
1676                                        e.to_string(),
1677                                    ))
1678                                })?;
1679                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1680                        }
1681                        GraphTypeAlteration::DropNodeType(name) => {
1682                            self.catalog
1683                                .alter_graph_type_drop_node_type(&effective_name, name)
1684                                .map_err(|e| {
1685                                    Error::Query(QueryError::new(
1686                                        QueryErrorKind::Semantic,
1687                                        e.to_string(),
1688                                    ))
1689                                })?;
1690                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1691                        }
1692                        GraphTypeAlteration::AddEdgeType(name) => {
1693                            self.catalog
1694                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1695                                .map_err(|e| {
1696                                    Error::Query(QueryError::new(
1697                                        QueryErrorKind::Semantic,
1698                                        e.to_string(),
1699                                    ))
1700                                })?;
1701                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1702                        }
1703                        GraphTypeAlteration::DropEdgeType(name) => {
1704                            self.catalog
1705                                .alter_graph_type_drop_edge_type(&effective_name, name)
1706                                .map_err(|e| {
1707                                    Error::Query(QueryError::new(
1708                                        QueryErrorKind::Semantic,
1709                                        e.to_string(),
1710                                    ))
1711                                })?;
1712                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1713                        }
1714                    }
1715                }
1716                wal_log!(
1717                    self,
1718                    WalRecord::AlterGraphType {
1719                        name: effective_name,
1720                        alterations: wal_alts,
1721                    }
1722                );
1723                Ok(QueryResult::status(format!(
1724                    "Altered graph type '{}'",
1725                    stmt.name
1726                )))
1727            }
1728            SchemaStatement::CreateProcedure(stmt) => {
1729                use crate::catalog::ProcedureDefinition;
1730
1731                let def = ProcedureDefinition {
1732                    name: stmt.name.clone(),
1733                    params: stmt
1734                        .params
1735                        .iter()
1736                        .map(|p| (p.name.clone(), p.param_type.clone()))
1737                        .collect(),
1738                    returns: stmt
1739                        .returns
1740                        .iter()
1741                        .map(|r| (r.name.clone(), r.return_type.clone()))
1742                        .collect(),
1743                    body: stmt.body.clone(),
1744                };
1745
1746                if stmt.or_replace {
1747                    self.catalog.replace_procedure(def).map_err(|e| {
1748                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1749                    })?;
1750                } else {
1751                    match self.catalog.register_procedure(def) {
1752                        Ok(()) => {}
1753                        Err(_) if stmt.if_not_exists => {
1754                            return Ok(QueryResult::empty());
1755                        }
1756                        Err(e) => {
1757                            return Err(Error::Query(QueryError::new(
1758                                QueryErrorKind::Semantic,
1759                                e.to_string(),
1760                            )));
1761                        }
1762                    }
1763                }
1764
1765                wal_log!(
1766                    self,
1767                    WalRecord::CreateProcedure {
1768                        name: stmt.name.clone(),
1769                        params: stmt
1770                            .params
1771                            .iter()
1772                            .map(|p| (p.name.clone(), p.param_type.clone()))
1773                            .collect(),
1774                        returns: stmt
1775                            .returns
1776                            .iter()
1777                            .map(|r| (r.name.clone(), r.return_type.clone()))
1778                            .collect(),
1779                        body: stmt.body,
1780                    }
1781                );
1782                Ok(QueryResult::status(format!(
1783                    "Created procedure '{}'",
1784                    stmt.name
1785                )))
1786            }
1787            SchemaStatement::DropProcedure { name, if_exists } => {
1788                match self.catalog.drop_procedure(&name) {
1789                    Ok(()) => {}
1790                    Err(_) if if_exists => {
1791                        return Ok(QueryResult::empty());
1792                    }
1793                    Err(e) => {
1794                        return Err(Error::Query(QueryError::new(
1795                            QueryErrorKind::Semantic,
1796                            e.to_string(),
1797                        )));
1798                    }
1799                }
1800                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1801                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1802            }
1803            SchemaStatement::ShowIndexes => {
1804                return self.execute_show_indexes();
1805            }
1806            SchemaStatement::ShowConstraints => {
1807                return self.execute_show_constraints();
1808            }
1809            SchemaStatement::ShowNodeTypes => {
1810                return self.execute_show_node_types();
1811            }
1812            SchemaStatement::ShowEdgeTypes => {
1813                return self.execute_show_edge_types();
1814            }
1815            SchemaStatement::ShowGraphTypes => {
1816                return self.execute_show_graph_types();
1817            }
1818            SchemaStatement::ShowGraphType(name) => {
1819                return self.execute_show_graph_type(&name);
1820            }
1821            SchemaStatement::ShowCurrentGraphType => {
1822                return self.execute_show_current_graph_type();
1823            }
1824            SchemaStatement::ShowGraphs => {
1825                return self.execute_show_graphs();
1826            }
1827            SchemaStatement::ShowSchemas => {
1828                return self.execute_show_schemas();
1829            }
1830        };
1831
1832        // Invalidate all cached query plans after any successful DDL change.
1833        // DDL is rare, so clearing the entire cache is cheap and correct.
1834        if result.is_ok() {
1835            self.query_cache.clear();
1836        }
1837
1838        result
1839    }
1840
1841    /// Creates a vector index on the store by scanning existing nodes.
1842    #[cfg(all(feature = "gql", feature = "vector-index"))]
1843    fn create_vector_index_on_store(
1844        store: &LpgStore,
1845        label: &str,
1846        property: &str,
1847        dimensions: Option<usize>,
1848        metric: Option<&str>,
1849    ) -> Result<()> {
1850        use grafeo_common::types::{PropertyKey, Value};
1851        use grafeo_common::utils::error::Error;
1852        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1853
1854        let metric = match metric {
1855            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1856                Error::Internal(format!(
1857                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1858                ))
1859            })?,
1860            None => DistanceMetric::Cosine,
1861        };
1862
1863        let prop_key = PropertyKey::new(property);
1864        let mut found_dims: Option<usize> = dimensions;
1865        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1866
1867        for node in store.nodes_with_label(label) {
1868            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1869                if let Some(expected) = found_dims {
1870                    if v.len() != expected {
1871                        return Err(Error::Internal(format!(
1872                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1873                            v.len(),
1874                            node.id.0
1875                        )));
1876                    }
1877                } else {
1878                    found_dims = Some(v.len());
1879                }
1880                vectors.push((node.id, v.to_vec()));
1881            }
1882        }
1883
1884        let Some(dims) = found_dims else {
1885            return Err(Error::Internal(format!(
1886                "No vector properties found on :{label}({property}) and no dimensions specified"
1887            )));
1888        };
1889
1890        let config = HnswConfig::new(dims, metric);
1891        let index = HnswIndex::with_capacity(config, vectors.len());
1892        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1893        for (node_id, vec) in &vectors {
1894            index.insert(*node_id, vec, &accessor);
1895        }
1896
1897        store.add_vector_index(label, property, Arc::new(index));
1898        Ok(())
1899    }
1900
1901    /// Stub for when vector-index feature is not enabled.
1902    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1903    fn create_vector_index_on_store(
1904        _store: &LpgStore,
1905        _label: &str,
1906        _property: &str,
1907        _dimensions: Option<usize>,
1908        _metric: Option<&str>,
1909    ) -> Result<()> {
1910        Err(grafeo_common::utils::error::Error::Internal(
1911            "Vector index support requires the 'vector-index' feature".to_string(),
1912        ))
1913    }
1914
1915    /// Creates a text index on the store by scanning existing nodes.
1916    #[cfg(all(feature = "gql", feature = "text-index"))]
1917    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1918        use grafeo_common::types::{PropertyKey, Value};
1919        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1920
1921        let mut index = InvertedIndex::new(BM25Config::default());
1922        let prop_key = PropertyKey::new(property);
1923
1924        let nodes = store.nodes_by_label(label);
1925        for node_id in nodes {
1926            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1927                index.insert(node_id, text.as_str());
1928            }
1929        }
1930
1931        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1932        Ok(())
1933    }
1934
1935    /// Stub for when text-index feature is not enabled.
1936    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1937    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1938        Err(grafeo_common::utils::error::Error::Internal(
1939            "Text index support requires the 'text-index' feature".to_string(),
1940        ))
1941    }
1942
1943    /// Returns a table of all indexes from the catalog.
1944    fn execute_show_indexes(&self) -> Result<QueryResult> {
1945        let indexes = self.catalog.all_indexes();
1946        let columns = vec![
1947            "name".to_string(),
1948            "type".to_string(),
1949            "label".to_string(),
1950            "property".to_string(),
1951        ];
1952        let rows: Vec<Vec<Value>> = indexes
1953            .into_iter()
1954            .map(|def| {
1955                let label_name = self
1956                    .catalog
1957                    .get_label_name(def.label)
1958                    .unwrap_or_else(|| "?".into());
1959                let prop_name = self
1960                    .catalog
1961                    .get_property_key_name(def.property_key)
1962                    .unwrap_or_else(|| "?".into());
1963                vec![
1964                    Value::from(def.name),
1965                    Value::from(format!("{:?}", def.index_type)),
1966                    Value::from(&*label_name),
1967                    Value::from(&*prop_name),
1968                ]
1969            })
1970            .collect();
1971        Ok(QueryResult {
1972            columns,
1973            column_types: Vec::new(),
1974            rows,
1975            ..QueryResult::empty()
1976        })
1977    }
1978
1979    /// Returns a table of all constraints (currently metadata-only).
1980    fn execute_show_constraints(&self) -> Result<QueryResult> {
1981        // Constraints are tracked in WAL but not yet in a queryable catalog.
1982        // Return an empty table with the expected schema.
1983        Ok(QueryResult {
1984            columns: vec![
1985                "name".to_string(),
1986                "type".to_string(),
1987                "label".to_string(),
1988                "properties".to_string(),
1989            ],
1990            column_types: Vec::new(),
1991            rows: Vec::new(),
1992            ..QueryResult::empty()
1993        })
1994    }
1995
1996    /// Returns a table of all registered node types in the current schema.
1997    fn execute_show_node_types(&self) -> Result<QueryResult> {
1998        let columns = vec![
1999            "name".to_string(),
2000            "properties".to_string(),
2001            "constraints".to_string(),
2002            "parents".to_string(),
2003        ];
2004        let schema = self.current_schema.lock().clone();
2005        let all_names = self.catalog.all_node_type_names();
2006        let type_names: Vec<String> = match &schema {
2007            Some(s) => {
2008                let prefix = format!("{s}/");
2009                all_names
2010                    .into_iter()
2011                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2012                    .collect()
2013            }
2014            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2015        };
2016        let rows: Vec<Vec<Value>> = type_names
2017            .into_iter()
2018            .filter_map(|name| {
2019                let lookup = match &schema {
2020                    Some(s) => format!("{s}/{name}"),
2021                    None => name.clone(),
2022                };
2023                let def = self.catalog.get_node_type(&lookup)?;
2024                let props: Vec<String> = def
2025                    .properties
2026                    .iter()
2027                    .map(|p| {
2028                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2029                        format!("{} {}{}", p.name, p.data_type, nullable)
2030                    })
2031                    .collect();
2032                let constraints: Vec<String> =
2033                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
2034                let parents = def.parent_types.join(", ");
2035                Some(vec![
2036                    Value::from(name),
2037                    Value::from(props.join(", ")),
2038                    Value::from(constraints.join(", ")),
2039                    Value::from(parents),
2040                ])
2041            })
2042            .collect();
2043        Ok(QueryResult {
2044            columns,
2045            column_types: Vec::new(),
2046            rows,
2047            ..QueryResult::empty()
2048        })
2049    }
2050
2051    /// Returns a table of all registered edge types in the current schema.
2052    fn execute_show_edge_types(&self) -> Result<QueryResult> {
2053        let columns = vec![
2054            "name".to_string(),
2055            "properties".to_string(),
2056            "source_types".to_string(),
2057            "target_types".to_string(),
2058        ];
2059        let schema = self.current_schema.lock().clone();
2060        let all_names = self.catalog.all_edge_type_names();
2061        let type_names: Vec<String> = match &schema {
2062            Some(s) => {
2063                let prefix = format!("{s}/");
2064                all_names
2065                    .into_iter()
2066                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2067                    .collect()
2068            }
2069            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2070        };
2071        let rows: Vec<Vec<Value>> = type_names
2072            .into_iter()
2073            .filter_map(|name| {
2074                let lookup = match &schema {
2075                    Some(s) => format!("{s}/{name}"),
2076                    None => name.clone(),
2077                };
2078                let def = self.catalog.get_edge_type_def(&lookup)?;
2079                let props: Vec<String> = def
2080                    .properties
2081                    .iter()
2082                    .map(|p| {
2083                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2084                        format!("{} {}{}", p.name, p.data_type, nullable)
2085                    })
2086                    .collect();
2087                let src = def.source_node_types.join(", ");
2088                let tgt = def.target_node_types.join(", ");
2089                Some(vec![
2090                    Value::from(name),
2091                    Value::from(props.join(", ")),
2092                    Value::from(src),
2093                    Value::from(tgt),
2094                ])
2095            })
2096            .collect();
2097        Ok(QueryResult {
2098            columns,
2099            column_types: Vec::new(),
2100            rows,
2101            ..QueryResult::empty()
2102        })
2103    }
2104
2105    /// Returns a table of all registered graph types in the current schema.
2106    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2107        let columns = vec![
2108            "name".to_string(),
2109            "open".to_string(),
2110            "node_types".to_string(),
2111            "edge_types".to_string(),
2112        ];
2113        let schema = self.current_schema.lock().clone();
2114        let all_names = self.catalog.all_graph_type_names();
2115        let type_names: Vec<String> = match &schema {
2116            Some(s) => {
2117                let prefix = format!("{s}/");
2118                all_names
2119                    .into_iter()
2120                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2121                    .collect()
2122            }
2123            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2124        };
2125        let rows: Vec<Vec<Value>> = type_names
2126            .into_iter()
2127            .filter_map(|name| {
2128                let lookup = match &schema {
2129                    Some(s) => format!("{s}/{name}"),
2130                    None => name.clone(),
2131                };
2132                let def = self.catalog.get_graph_type_def(&lookup)?;
2133                // Strip schema prefix from allowed type names for display
2134                let strip = |n: &String| -> String {
2135                    match &schema {
2136                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2137                        None => n.clone(),
2138                    }
2139                };
2140                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2141                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2142                Some(vec![
2143                    Value::from(name),
2144                    Value::from(def.open),
2145                    Value::from(node_types.join(", ")),
2146                    Value::from(edge_types.join(", ")),
2147                ])
2148            })
2149            .collect();
2150        Ok(QueryResult {
2151            columns,
2152            column_types: Vec::new(),
2153            rows,
2154            ..QueryResult::empty()
2155        })
2156    }
2157
2158    /// Returns the list of named graphs visible in the current schema context.
2159    ///
2160    /// When a session schema is set, only graphs belonging to that schema are
2161    /// shown (their compound prefix is stripped). When no schema is set, graphs
2162    /// without a schema prefix are shown (the default schema).
2163    fn execute_show_graphs(&self) -> Result<QueryResult> {
2164        let schema = self.current_schema.lock().clone();
2165        let all_names = self.store.graph_names();
2166
2167        let mut names: Vec<String> = match &schema {
2168            Some(s) => {
2169                let prefix = format!("{s}/");
2170                all_names
2171                    .into_iter()
2172                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2173                    .filter(|n| n != SCHEMA_DEFAULT_GRAPH)
2174                    .collect()
2175            }
2176            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2177        };
2178        names.sort();
2179
2180        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2181        Ok(QueryResult {
2182            columns: vec!["name".to_string()],
2183            column_types: Vec::new(),
2184            rows,
2185            ..QueryResult::empty()
2186        })
2187    }
2188
2189    /// Returns the list of all schema namespaces.
2190    fn execute_show_schemas(&self) -> Result<QueryResult> {
2191        let mut names = self.catalog.schema_names();
2192        names.sort();
2193        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2194        Ok(QueryResult {
2195            columns: vec!["name".to_string()],
2196            column_types: Vec::new(),
2197            rows,
2198            ..QueryResult::empty()
2199        })
2200    }
2201
2202    /// Returns detailed info for a specific graph type.
2203    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2204        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2205
2206        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2207            Error::Query(QueryError::new(
2208                QueryErrorKind::Semantic,
2209                format!("Graph type '{name}' not found"),
2210            ))
2211        })?;
2212
2213        let columns = vec![
2214            "name".to_string(),
2215            "open".to_string(),
2216            "node_types".to_string(),
2217            "edge_types".to_string(),
2218        ];
2219        let rows = vec![vec![
2220            Value::from(def.name),
2221            Value::from(def.open),
2222            Value::from(def.allowed_node_types.join(", ")),
2223            Value::from(def.allowed_edge_types.join(", ")),
2224        ]];
2225        Ok(QueryResult {
2226            columns,
2227            column_types: Vec::new(),
2228            rows,
2229            ..QueryResult::empty()
2230        })
2231    }
2232
2233    /// Returns the graph type bound to the current graph.
2234    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2235        let graph_name = self
2236            .current_graph()
2237            .unwrap_or_else(|| "default".to_string());
2238        let columns = vec![
2239            "graph".to_string(),
2240            "graph_type".to_string(),
2241            "open".to_string(),
2242            "node_types".to_string(),
2243            "edge_types".to_string(),
2244        ];
2245
2246        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2247            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2248        {
2249            let rows = vec![vec![
2250                Value::from(graph_name),
2251                Value::from(type_name),
2252                Value::from(def.open),
2253                Value::from(def.allowed_node_types.join(", ")),
2254                Value::from(def.allowed_edge_types.join(", ")),
2255            ]];
2256            return Ok(QueryResult {
2257                columns,
2258                column_types: Vec::new(),
2259                rows,
2260                ..QueryResult::empty()
2261            });
2262        }
2263
2264        // No graph type binding found
2265        Ok(QueryResult {
2266            columns,
2267            column_types: Vec::new(),
2268            rows: vec![vec![
2269                Value::from(graph_name),
2270                Value::Null,
2271                Value::Null,
2272                Value::Null,
2273                Value::Null,
2274            ]],
2275            ..QueryResult::empty()
2276        })
2277    }
2278
2279    /// Executes a GQL query.
2280    ///
2281    /// # Errors
2282    ///
2283    /// Returns an error if the query fails to parse or execute.
2284    ///
2285    /// # Examples
2286    ///
2287    /// ```no_run
2288    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2289    /// use grafeo_engine::GrafeoDB;
2290    ///
2291    /// let db = GrafeoDB::new_in_memory();
2292    /// let session = db.session();
2293    ///
2294    /// // Create a node
2295    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2296    ///
2297    /// // Query nodes
2298    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2299    /// for row in &result.rows {
2300    ///     println!("{:?}", row);
2301    /// }
2302    /// # Ok(())
2303    /// # }
2304    /// ```
2305    #[cfg(feature = "gql")]
2306    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2307        self.require_lpg("GQL")?;
2308
2309        use crate::query::{
2310            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2311            processor::QueryLanguage, translators::gql,
2312        };
2313
2314        let _span = grafeo_info_span!(
2315            "grafeo::session::execute",
2316            language = "gql",
2317            query_len = query.len(),
2318        );
2319
2320        #[cfg(not(target_arch = "wasm32"))]
2321        let start_time = std::time::Instant::now();
2322
2323        // Parse and translate, checking for session/schema commands first
2324        let translation = gql::translate_full(query)?;
2325        let logical_plan = match translation {
2326            gql::GqlTranslationResult::SessionCommand(cmd) => {
2327                return self.execute_session_command(cmd);
2328            }
2329            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2330                // All DDL is a write operation
2331                if *self.read_only_tx.lock() {
2332                    return Err(grafeo_common::utils::error::Error::Transaction(
2333                        grafeo_common::utils::error::TransactionError::ReadOnly,
2334                    ));
2335                }
2336                return self.execute_schema_command(cmd);
2337            }
2338            gql::GqlTranslationResult::Plan(plan) => {
2339                // Block mutations in read-only transactions
2340                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2341                    return Err(grafeo_common::utils::error::Error::Transaction(
2342                        grafeo_common::utils::error::TransactionError::ReadOnly,
2343                    ));
2344                }
2345                plan
2346            }
2347        };
2348
2349        // Create cache key for this query
2350        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2351
2352        // Try to get cached optimized plan, or use the plan we just translated
2353        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2354            cached_plan
2355        } else {
2356            // Semantic validation
2357            let mut binder = Binder::new();
2358            let _binding_context = binder.bind(&logical_plan)?;
2359
2360            // Optimize the plan
2361            let active = self.active_store();
2362            let optimizer = Optimizer::from_graph_store(&*active);
2363            let plan = optimizer.optimize(logical_plan)?;
2364
2365            // Cache the optimized plan for future use
2366            self.query_cache.put_optimized(cache_key, plan.clone());
2367
2368            plan
2369        };
2370
2371        // Resolve the active store for query execution
2372        let active = self.active_store();
2373
2374        // EXPLAIN: annotate pushdown hints and return the plan tree
2375        if optimized_plan.explain {
2376            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2377            let mut plan = optimized_plan;
2378            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2379            return Ok(explain_result(&plan));
2380        }
2381
2382        // PROFILE: execute with per-operator instrumentation
2383        if optimized_plan.profile {
2384            let has_mutations = optimized_plan.root.has_mutations();
2385            return self.with_auto_commit(has_mutations, || {
2386                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2387                let planner = self.create_planner_for_store(
2388                    Arc::clone(&active),
2389                    viewing_epoch,
2390                    transaction_id,
2391                );
2392                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2393
2394                let executor = Executor::with_columns(physical_plan.columns.clone())
2395                    .with_deadline(self.query_deadline());
2396                let _result = executor.execute(physical_plan.operator.as_mut())?;
2397
2398                let total_time_ms;
2399                #[cfg(not(target_arch = "wasm32"))]
2400                {
2401                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2402                }
2403                #[cfg(target_arch = "wasm32")]
2404                {
2405                    total_time_ms = 0.0;
2406                }
2407
2408                let profile_tree = crate::query::profile::build_profile_tree(
2409                    &optimized_plan.root,
2410                    &mut entries.into_iter(),
2411                );
2412                Ok(crate::query::profile::profile_result(
2413                    &profile_tree,
2414                    total_time_ms,
2415                ))
2416            });
2417        }
2418
2419        let has_mutations = optimized_plan.root.has_mutations();
2420
2421        let result = self.with_auto_commit(has_mutations, || {
2422            // Get transaction context for MVCC visibility
2423            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2424
2425            // Convert to physical plan with transaction context
2426            // (Physical planning cannot be cached as it depends on transaction state)
2427            // Safe to use read-only fast path when: this query has no mutations AND
2428            // there is no active transaction that may have prior uncommitted writes.
2429            let has_active_tx = self.current_transaction.lock().is_some();
2430            let read_only = !has_mutations && !has_active_tx;
2431            let planner = self.create_planner_for_store_with_read_only(
2432                Arc::clone(&active),
2433                viewing_epoch,
2434                transaction_id,
2435                read_only,
2436            );
2437            let mut physical_plan = planner.plan(&optimized_plan)?;
2438
2439            // Execute the plan
2440            let executor = Executor::with_columns(physical_plan.columns.clone())
2441                .with_deadline(self.query_deadline());
2442            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2443
2444            // Add execution metrics
2445            let rows_scanned = result.rows.len() as u64;
2446            #[cfg(not(target_arch = "wasm32"))]
2447            {
2448                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2449                result.execution_time_ms = Some(elapsed_ms);
2450            }
2451            result.rows_scanned = Some(rows_scanned);
2452
2453            Ok(result)
2454        });
2455
2456        // Record metrics for this query execution.
2457        #[cfg(feature = "metrics")]
2458        {
2459            #[cfg(not(target_arch = "wasm32"))]
2460            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2461            #[cfg(target_arch = "wasm32")]
2462            let elapsed_ms = None;
2463            self.record_query_metrics("gql", elapsed_ms, &result);
2464        }
2465
2466        result
2467    }
2468
2469    /// Executes a GQL query with visibility at the specified epoch.
2470    ///
2471    /// This enables time-travel queries: the query sees the database
2472    /// as it existed at the given epoch.
2473    ///
2474    /// # Errors
2475    ///
2476    /// Returns an error if parsing or execution fails.
2477    #[cfg(feature = "gql")]
2478    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2479        let previous = self.viewing_epoch_override.lock().replace(epoch);
2480        let result = self.execute(query);
2481        *self.viewing_epoch_override.lock() = previous;
2482        result
2483    }
2484
2485    /// Executes a GQL query at a specific epoch with optional parameters.
2486    ///
2487    /// Combines epoch-based time travel with parameterized queries.
2488    ///
2489    /// # Errors
2490    ///
2491    /// Returns an error if parsing or execution fails.
2492    #[cfg(feature = "gql")]
2493    pub fn execute_at_epoch_with_params(
2494        &self,
2495        query: &str,
2496        epoch: EpochId,
2497        params: Option<std::collections::HashMap<String, Value>>,
2498    ) -> Result<QueryResult> {
2499        let previous = self.viewing_epoch_override.lock().replace(epoch);
2500        let result = if let Some(p) = params {
2501            self.execute_with_params(query, p)
2502        } else {
2503            self.execute(query)
2504        };
2505        *self.viewing_epoch_override.lock() = previous;
2506        result
2507    }
2508
2509    /// Executes a GQL query with parameters.
2510    ///
2511    /// # Errors
2512    ///
2513    /// Returns an error if the query fails to parse or execute.
2514    #[cfg(feature = "gql")]
2515    pub fn execute_with_params(
2516        &self,
2517        query: &str,
2518        params: std::collections::HashMap<String, Value>,
2519    ) -> Result<QueryResult> {
2520        self.require_lpg("GQL")?;
2521
2522        use crate::query::processor::{QueryLanguage, QueryProcessor};
2523
2524        let has_mutations = Self::query_looks_like_mutation(query);
2525        let active = self.active_store();
2526
2527        self.with_auto_commit(has_mutations, || {
2528            // Get transaction context for MVCC visibility
2529            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2530
2531            // Create processor with transaction context
2532            let processor = QueryProcessor::for_stores_with_transaction(
2533                Arc::clone(&active),
2534                self.active_write_store(),
2535                Arc::clone(&self.transaction_manager),
2536            )?;
2537
2538            // Apply transaction context if in a transaction
2539            let processor = if let Some(transaction_id) = transaction_id {
2540                processor.with_transaction_context(viewing_epoch, transaction_id)
2541            } else {
2542                processor
2543            };
2544
2545            processor.process(query, QueryLanguage::Gql, Some(&params))
2546        })
2547    }
2548
2549    /// Executes a GQL query with parameters.
2550    ///
2551    /// # Errors
2552    ///
2553    /// Returns an error if no query language is enabled.
2554    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2555    pub fn execute_with_params(
2556        &self,
2557        _query: &str,
2558        _params: std::collections::HashMap<String, Value>,
2559    ) -> Result<QueryResult> {
2560        Err(grafeo_common::utils::error::Error::Internal(
2561            "No query language enabled".to_string(),
2562        ))
2563    }
2564
2565    /// Executes a GQL query.
2566    ///
2567    /// # Errors
2568    ///
2569    /// Returns an error if no query language is enabled.
2570    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2571    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2572        Err(grafeo_common::utils::error::Error::Internal(
2573            "No query language enabled".to_string(),
2574        ))
2575    }
2576
2577    /// Executes a Cypher query.
2578    ///
2579    /// # Errors
2580    ///
2581    /// Returns an error if the query fails to parse or execute.
2582    #[cfg(feature = "cypher")]
2583    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2584        use crate::query::{
2585            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2586            processor::QueryLanguage, translators::cypher,
2587        };
2588        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2589
2590        // Handle schema DDL and SHOW commands before the normal query path
2591        let translation = cypher::translate_full(query)?;
2592        match translation {
2593            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2594                if *self.read_only_tx.lock() {
2595                    return Err(GrafeoError::Query(QueryError::new(
2596                        QueryErrorKind::Semantic,
2597                        "Cannot execute schema DDL in a read-only transaction",
2598                    )));
2599                }
2600                return self.execute_schema_command(cmd);
2601            }
2602            cypher::CypherTranslationResult::ShowIndexes => {
2603                return self.execute_show_indexes();
2604            }
2605            cypher::CypherTranslationResult::ShowConstraints => {
2606                return self.execute_show_constraints();
2607            }
2608            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2609                return self.execute_show_current_graph_type();
2610            }
2611            cypher::CypherTranslationResult::Plan(_) => {
2612                // Fall through to normal execution below
2613            }
2614        }
2615
2616        #[cfg(not(target_arch = "wasm32"))]
2617        let start_time = std::time::Instant::now();
2618
2619        // Create cache key for this query
2620        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2621
2622        // Try to get cached optimized plan
2623        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2624            cached_plan
2625        } else {
2626            // Parse and translate the query to a logical plan
2627            let logical_plan = cypher::translate(query)?;
2628
2629            // Semantic validation
2630            let mut binder = Binder::new();
2631            let _binding_context = binder.bind(&logical_plan)?;
2632
2633            // Optimize the plan
2634            let active = self.active_store();
2635            let optimizer = Optimizer::from_graph_store(&*active);
2636            let plan = optimizer.optimize(logical_plan)?;
2637
2638            // Cache the optimized plan
2639            self.query_cache.put_optimized(cache_key, plan.clone());
2640
2641            plan
2642        };
2643
2644        // Resolve the active store for query execution
2645        let active = self.active_store();
2646
2647        // EXPLAIN
2648        if optimized_plan.explain {
2649            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2650            let mut plan = optimized_plan;
2651            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2652            return Ok(explain_result(&plan));
2653        }
2654
2655        // PROFILE
2656        if optimized_plan.profile {
2657            let has_mutations = optimized_plan.root.has_mutations();
2658            return self.with_auto_commit(has_mutations, || {
2659                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2660                let planner = self.create_planner_for_store(
2661                    Arc::clone(&active),
2662                    viewing_epoch,
2663                    transaction_id,
2664                );
2665                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2666
2667                let executor = Executor::with_columns(physical_plan.columns.clone())
2668                    .with_deadline(self.query_deadline());
2669                let _result = executor.execute(physical_plan.operator.as_mut())?;
2670
2671                let total_time_ms;
2672                #[cfg(not(target_arch = "wasm32"))]
2673                {
2674                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2675                }
2676                #[cfg(target_arch = "wasm32")]
2677                {
2678                    total_time_ms = 0.0;
2679                }
2680
2681                let profile_tree = crate::query::profile::build_profile_tree(
2682                    &optimized_plan.root,
2683                    &mut entries.into_iter(),
2684                );
2685                Ok(crate::query::profile::profile_result(
2686                    &profile_tree,
2687                    total_time_ms,
2688                ))
2689            });
2690        }
2691
2692        let has_mutations = optimized_plan.root.has_mutations();
2693
2694        let result = self.with_auto_commit(has_mutations, || {
2695            // Get transaction context for MVCC visibility
2696            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2697
2698            // Convert to physical plan with transaction context
2699            let planner =
2700                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2701            let mut physical_plan = planner.plan(&optimized_plan)?;
2702
2703            // Execute the plan
2704            let executor = Executor::with_columns(physical_plan.columns.clone())
2705                .with_deadline(self.query_deadline());
2706            executor.execute(physical_plan.operator.as_mut())
2707        });
2708
2709        #[cfg(feature = "metrics")]
2710        {
2711            #[cfg(not(target_arch = "wasm32"))]
2712            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2713            #[cfg(target_arch = "wasm32")]
2714            let elapsed_ms = None;
2715            self.record_query_metrics("cypher", elapsed_ms, &result);
2716        }
2717
2718        result
2719    }
2720
2721    /// Executes a Gremlin query.
2722    ///
2723    /// # Errors
2724    ///
2725    /// Returns an error if the query fails to parse or execute.
2726    ///
2727    /// # Examples
2728    ///
2729    /// ```no_run
2730    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2731    /// use grafeo_engine::GrafeoDB;
2732    ///
2733    /// let db = GrafeoDB::new_in_memory();
2734    /// let session = db.session();
2735    ///
2736    /// // Create some nodes first
2737    /// session.create_node(&["Person"]);
2738    ///
2739    /// // Query using Gremlin
2740    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2741    /// # Ok(())
2742    /// # }
2743    /// ```
2744    #[cfg(feature = "gremlin")]
2745    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2746        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2747
2748        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2749        let start_time = Instant::now();
2750
2751        // Parse and translate the query to a logical plan
2752        let logical_plan = gremlin::translate(query)?;
2753
2754        // Semantic validation
2755        let mut binder = Binder::new();
2756        let _binding_context = binder.bind(&logical_plan)?;
2757
2758        // Optimize the plan
2759        let active = self.active_store();
2760        let optimizer = Optimizer::from_graph_store(&*active);
2761        let optimized_plan = optimizer.optimize(logical_plan)?;
2762
2763        let has_mutations = optimized_plan.root.has_mutations();
2764
2765        let result = self.with_auto_commit(has_mutations, || {
2766            // Get transaction context for MVCC visibility
2767            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2768
2769            // Convert to physical plan with transaction context
2770            let planner =
2771                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2772            let mut physical_plan = planner.plan(&optimized_plan)?;
2773
2774            // Execute the plan
2775            let executor = Executor::with_columns(physical_plan.columns.clone())
2776                .with_deadline(self.query_deadline());
2777            executor.execute(physical_plan.operator.as_mut())
2778        });
2779
2780        #[cfg(feature = "metrics")]
2781        {
2782            #[cfg(not(target_arch = "wasm32"))]
2783            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2784            #[cfg(target_arch = "wasm32")]
2785            let elapsed_ms = None;
2786            self.record_query_metrics("gremlin", elapsed_ms, &result);
2787        }
2788
2789        result
2790    }
2791
2792    /// Executes a Gremlin query with parameters.
2793    ///
2794    /// # Errors
2795    ///
2796    /// Returns an error if the query fails to parse or execute.
2797    #[cfg(feature = "gremlin")]
2798    pub fn execute_gremlin_with_params(
2799        &self,
2800        query: &str,
2801        params: std::collections::HashMap<String, Value>,
2802    ) -> Result<QueryResult> {
2803        use crate::query::processor::{QueryLanguage, QueryProcessor};
2804
2805        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2806        let start_time = Instant::now();
2807
2808        let has_mutations = Self::query_looks_like_mutation(query);
2809        let active = self.active_store();
2810
2811        let result = self.with_auto_commit(has_mutations, || {
2812            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2813            let processor = QueryProcessor::for_stores_with_transaction(
2814                Arc::clone(&active),
2815                self.active_write_store(),
2816                Arc::clone(&self.transaction_manager),
2817            )?;
2818            let processor = if let Some(transaction_id) = transaction_id {
2819                processor.with_transaction_context(viewing_epoch, transaction_id)
2820            } else {
2821                processor
2822            };
2823            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2824        });
2825
2826        #[cfg(feature = "metrics")]
2827        {
2828            #[cfg(not(target_arch = "wasm32"))]
2829            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2830            #[cfg(target_arch = "wasm32")]
2831            let elapsed_ms = None;
2832            self.record_query_metrics("gremlin", elapsed_ms, &result);
2833        }
2834
2835        result
2836    }
2837
2838    /// Executes a GraphQL query against the LPG store.
2839    ///
2840    /// # Errors
2841    ///
2842    /// Returns an error if the query fails to parse or execute.
2843    ///
2844    /// # Examples
2845    ///
2846    /// ```no_run
2847    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2848    /// use grafeo_engine::GrafeoDB;
2849    ///
2850    /// let db = GrafeoDB::new_in_memory();
2851    /// let session = db.session();
2852    ///
2853    /// // Create some nodes first
2854    /// session.create_node(&["User"]);
2855    ///
2856    /// // Query using GraphQL
2857    /// let result = session.execute_graphql("query { user { id name } }")?;
2858    /// # Ok(())
2859    /// # }
2860    /// ```
2861    #[cfg(feature = "graphql")]
2862    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2863        use crate::query::{
2864            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2865            translators::graphql,
2866        };
2867
2868        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2869        let start_time = Instant::now();
2870
2871        let mut logical_plan = graphql::translate(query)?;
2872
2873        // Substitute default parameter values from variable declarations
2874        if !logical_plan.default_params.is_empty() {
2875            let defaults = logical_plan.default_params.clone();
2876            substitute_params(&mut logical_plan, &defaults)?;
2877        }
2878
2879        let mut binder = Binder::new();
2880        let _binding_context = binder.bind(&logical_plan)?;
2881
2882        let active = self.active_store();
2883        let optimizer = Optimizer::from_graph_store(&*active);
2884        let optimized_plan = optimizer.optimize(logical_plan)?;
2885        let has_mutations = optimized_plan.root.has_mutations();
2886
2887        let result = self.with_auto_commit(has_mutations, || {
2888            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2889            let planner =
2890                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2891            let mut physical_plan = planner.plan(&optimized_plan)?;
2892            let executor = Executor::with_columns(physical_plan.columns.clone())
2893                .with_deadline(self.query_deadline());
2894            executor.execute(physical_plan.operator.as_mut())
2895        });
2896
2897        #[cfg(feature = "metrics")]
2898        {
2899            #[cfg(not(target_arch = "wasm32"))]
2900            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2901            #[cfg(target_arch = "wasm32")]
2902            let elapsed_ms = None;
2903            self.record_query_metrics("graphql", elapsed_ms, &result);
2904        }
2905
2906        result
2907    }
2908
2909    /// Executes a GraphQL query with parameters.
2910    ///
2911    /// # Errors
2912    ///
2913    /// Returns an error if the query fails to parse or execute.
2914    #[cfg(feature = "graphql")]
2915    pub fn execute_graphql_with_params(
2916        &self,
2917        query: &str,
2918        params: std::collections::HashMap<String, Value>,
2919    ) -> Result<QueryResult> {
2920        use crate::query::processor::{QueryLanguage, QueryProcessor};
2921
2922        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2923        let start_time = Instant::now();
2924
2925        let has_mutations = Self::query_looks_like_mutation(query);
2926        let active = self.active_store();
2927
2928        let result = self.with_auto_commit(has_mutations, || {
2929            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2930            let processor = QueryProcessor::for_stores_with_transaction(
2931                Arc::clone(&active),
2932                self.active_write_store(),
2933                Arc::clone(&self.transaction_manager),
2934            )?;
2935            let processor = if let Some(transaction_id) = transaction_id {
2936                processor.with_transaction_context(viewing_epoch, transaction_id)
2937            } else {
2938                processor
2939            };
2940            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2941        });
2942
2943        #[cfg(feature = "metrics")]
2944        {
2945            #[cfg(not(target_arch = "wasm32"))]
2946            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2947            #[cfg(target_arch = "wasm32")]
2948            let elapsed_ms = None;
2949            self.record_query_metrics("graphql", elapsed_ms, &result);
2950        }
2951
2952        result
2953    }
2954
2955    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2956    ///
2957    /// # Errors
2958    ///
2959    /// Returns an error if the query fails to parse or execute.
2960    ///
2961    /// # Examples
2962    ///
2963    /// ```no_run
2964    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2965    /// use grafeo_engine::GrafeoDB;
2966    ///
2967    /// let db = GrafeoDB::new_in_memory();
2968    /// let session = db.session();
2969    ///
2970    /// let result = session.execute_sql(
2971    ///     "SELECT * FROM GRAPH_TABLE (
2972    ///         MATCH (n:Person)
2973    ///         COLUMNS (n.name AS name)
2974    ///     )"
2975    /// )?;
2976    /// # Ok(())
2977    /// # }
2978    /// ```
2979    #[cfg(feature = "sql-pgq")]
2980    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2981        use crate::query::{
2982            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2983            processor::QueryLanguage, translators::sql_pgq,
2984        };
2985
2986        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2987        let start_time = Instant::now();
2988
2989        // Parse and translate (always needed to check for DDL)
2990        let logical_plan = sql_pgq::translate(query)?;
2991
2992        // Handle DDL statements directly (they don't go through the query pipeline)
2993        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2994            return Ok(QueryResult {
2995                columns: vec!["status".into()],
2996                column_types: vec![grafeo_common::types::LogicalType::String],
2997                rows: vec![vec![Value::from(format!(
2998                    "Property graph '{}' created ({} node tables, {} edge tables)",
2999                    cpg.name,
3000                    cpg.node_tables.len(),
3001                    cpg.edge_tables.len()
3002                ))]],
3003                execution_time_ms: None,
3004                rows_scanned: None,
3005                status_message: None,
3006                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
3007            });
3008        }
3009
3010        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
3011
3012        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
3013            cached_plan
3014        } else {
3015            let mut binder = Binder::new();
3016            let _binding_context = binder.bind(&logical_plan)?;
3017            let active = self.active_store();
3018            let optimizer = Optimizer::from_graph_store(&*active);
3019            let plan = optimizer.optimize(logical_plan)?;
3020            self.query_cache.put_optimized(cache_key, plan.clone());
3021            plan
3022        };
3023
3024        let active = self.active_store();
3025        let has_mutations = optimized_plan.root.has_mutations();
3026
3027        let result = self.with_auto_commit(has_mutations, || {
3028            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3029            let planner =
3030                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
3031            let mut physical_plan = planner.plan(&optimized_plan)?;
3032            let executor = Executor::with_columns(physical_plan.columns.clone())
3033                .with_deadline(self.query_deadline());
3034            executor.execute(physical_plan.operator.as_mut())
3035        });
3036
3037        #[cfg(feature = "metrics")]
3038        {
3039            #[cfg(not(target_arch = "wasm32"))]
3040            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3041            #[cfg(target_arch = "wasm32")]
3042            let elapsed_ms = None;
3043            self.record_query_metrics("sql", elapsed_ms, &result);
3044        }
3045
3046        result
3047    }
3048
3049    /// Executes a SQL/PGQ query with parameters.
3050    ///
3051    /// # Errors
3052    ///
3053    /// Returns an error if the query fails to parse or execute.
3054    #[cfg(feature = "sql-pgq")]
3055    pub fn execute_sql_with_params(
3056        &self,
3057        query: &str,
3058        params: std::collections::HashMap<String, Value>,
3059    ) -> Result<QueryResult> {
3060        use crate::query::processor::{QueryLanguage, QueryProcessor};
3061
3062        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3063        let start_time = Instant::now();
3064
3065        let has_mutations = Self::query_looks_like_mutation(query);
3066        let active = self.active_store();
3067
3068        let result = self.with_auto_commit(has_mutations, || {
3069            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3070            let processor = QueryProcessor::for_stores_with_transaction(
3071                Arc::clone(&active),
3072                self.active_write_store(),
3073                Arc::clone(&self.transaction_manager),
3074            )?;
3075            let processor = if let Some(transaction_id) = transaction_id {
3076                processor.with_transaction_context(viewing_epoch, transaction_id)
3077            } else {
3078                processor
3079            };
3080            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3081        });
3082
3083        #[cfg(feature = "metrics")]
3084        {
3085            #[cfg(not(target_arch = "wasm32"))]
3086            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3087            #[cfg(target_arch = "wasm32")]
3088            let elapsed_ms = None;
3089            self.record_query_metrics("sql", elapsed_ms, &result);
3090        }
3091
3092        result
3093    }
3094
3095    /// Executes a query in the specified language by name.
3096    ///
3097    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3098    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3099    ///
3100    /// # Errors
3101    ///
3102    /// Returns an error if the language is unknown/disabled or the query fails.
3103    pub fn execute_language(
3104        &self,
3105        query: &str,
3106        language: &str,
3107        params: Option<std::collections::HashMap<String, Value>>,
3108    ) -> Result<QueryResult> {
3109        let _span = grafeo_info_span!(
3110            "grafeo::session::execute",
3111            language,
3112            query_len = query.len(),
3113        );
3114        match language {
3115            "gql" => {
3116                if let Some(p) = params {
3117                    self.execute_with_params(query, p)
3118                } else {
3119                    self.execute(query)
3120                }
3121            }
3122            #[cfg(feature = "cypher")]
3123            "cypher" => {
3124                if let Some(p) = params {
3125                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3126
3127                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3128                    let start_time = Instant::now();
3129
3130                    let has_mutations = Self::query_looks_like_mutation(query);
3131                    let active = self.active_store();
3132                    let result = self.with_auto_commit(has_mutations, || {
3133                        let processor = QueryProcessor::for_stores_with_transaction(
3134                            Arc::clone(&active),
3135                            self.active_write_store(),
3136                            Arc::clone(&self.transaction_manager),
3137                        )?;
3138                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3139                        let processor = if let Some(transaction_id) = transaction_id {
3140                            processor.with_transaction_context(viewing_epoch, transaction_id)
3141                        } else {
3142                            processor
3143                        };
3144                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3145                    });
3146
3147                    #[cfg(feature = "metrics")]
3148                    {
3149                        #[cfg(not(target_arch = "wasm32"))]
3150                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3151                        #[cfg(target_arch = "wasm32")]
3152                        let elapsed_ms = None;
3153                        self.record_query_metrics("cypher", elapsed_ms, &result);
3154                    }
3155
3156                    result
3157                } else {
3158                    self.execute_cypher(query)
3159                }
3160            }
3161            #[cfg(feature = "gremlin")]
3162            "gremlin" => {
3163                if let Some(p) = params {
3164                    self.execute_gremlin_with_params(query, p)
3165                } else {
3166                    self.execute_gremlin(query)
3167                }
3168            }
3169            #[cfg(feature = "graphql")]
3170            "graphql" => {
3171                if let Some(p) = params {
3172                    self.execute_graphql_with_params(query, p)
3173                } else {
3174                    self.execute_graphql(query)
3175                }
3176            }
3177            #[cfg(all(feature = "graphql", feature = "rdf"))]
3178            "graphql-rdf" => {
3179                if let Some(p) = params {
3180                    self.execute_graphql_rdf_with_params(query, p)
3181                } else {
3182                    self.execute_graphql_rdf(query)
3183                }
3184            }
3185            #[cfg(feature = "sql-pgq")]
3186            "sql" | "sql-pgq" => {
3187                if let Some(p) = params {
3188                    self.execute_sql_with_params(query, p)
3189                } else {
3190                    self.execute_sql(query)
3191                }
3192            }
3193            #[cfg(all(feature = "sparql", feature = "rdf"))]
3194            "sparql" => {
3195                if let Some(p) = params {
3196                    self.execute_sparql_with_params(query, p)
3197                } else {
3198                    self.execute_sparql(query)
3199                }
3200            }
3201            other => Err(grafeo_common::utils::error::Error::Query(
3202                grafeo_common::utils::error::QueryError::new(
3203                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3204                    format!("Unknown query language: '{other}'"),
3205                ),
3206            )),
3207        }
3208    }
3209
3210    /// Begins a new transaction.
3211    ///
3212    /// # Errors
3213    ///
3214    /// Returns an error if a transaction is already active.
3215    ///
3216    /// # Examples
3217    ///
3218    /// ```no_run
3219    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3220    /// use grafeo_engine::GrafeoDB;
3221    ///
3222    /// let db = GrafeoDB::new_in_memory();
3223    /// let mut session = db.session();
3224    ///
3225    /// session.begin_transaction()?;
3226    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3227    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3228    /// session.commit()?; // Both inserts committed atomically
3229    /// # Ok(())
3230    /// # }
3231    /// ```
3232    /// Clears all cached query plans.
3233    ///
3234    /// The plan cache is shared across all sessions on the same database,
3235    /// so clearing from one session affects all sessions.
3236    pub fn clear_plan_cache(&self) {
3237        self.query_cache.clear();
3238    }
3239
3240    /// Begins a new transaction on this session.
3241    ///
3242    /// Uses the default isolation level (`SnapshotIsolation`).
3243    ///
3244    /// # Errors
3245    ///
3246    /// Returns an error if a transaction is already active.
3247    pub fn begin_transaction(&mut self) -> Result<()> {
3248        self.begin_transaction_inner(false, None)
3249    }
3250
3251    /// Begins a transaction with a specific isolation level.
3252    ///
3253    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3254    ///
3255    /// # Errors
3256    ///
3257    /// Returns an error if a transaction is already active.
3258    pub fn begin_transaction_with_isolation(
3259        &mut self,
3260        isolation_level: crate::transaction::IsolationLevel,
3261    ) -> Result<()> {
3262        self.begin_transaction_inner(false, Some(isolation_level))
3263    }
3264
3265    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3266    fn begin_transaction_inner(
3267        &self,
3268        read_only: bool,
3269        isolation_level: Option<crate::transaction::IsolationLevel>,
3270    ) -> Result<()> {
3271        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3272        let mut current = self.current_transaction.lock();
3273        if current.is_some() {
3274            // Nested transaction: create an auto-savepoint instead of a new tx.
3275            drop(current);
3276            let mut depth = self.transaction_nesting_depth.lock();
3277            *depth += 1;
3278            let sp_name = format!("_nested_tx_{}", *depth);
3279            self.savepoint(&sp_name)?;
3280            return Ok(());
3281        }
3282
3283        let active = self.active_lpg_store();
3284        self.transaction_start_node_count
3285            .store(active.node_count(), Ordering::Relaxed);
3286        self.transaction_start_edge_count
3287            .store(active.edge_count(), Ordering::Relaxed);
3288        let transaction_id = if let Some(level) = isolation_level {
3289            self.transaction_manager.begin_with_isolation(level)
3290        } else {
3291            self.transaction_manager.begin()
3292        };
3293        *current = Some(transaction_id);
3294        *self.read_only_tx.lock() = read_only || self.db_read_only;
3295
3296        // Record the initial graph as "touched" for cross-graph atomicity.
3297        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3298        let key = self.active_graph_storage_key();
3299        let mut touched = self.touched_graphs.lock();
3300        touched.clear();
3301        touched.push(key);
3302
3303        #[cfg(feature = "metrics")]
3304        {
3305            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3306            #[cfg(not(target_arch = "wasm32"))]
3307            {
3308                *self.tx_start_time.lock() = Some(Instant::now());
3309            }
3310        }
3311
3312        Ok(())
3313    }
3314
3315    /// Commits the current transaction.
3316    ///
3317    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3318    ///
3319    /// # Errors
3320    ///
3321    /// Returns an error if no transaction is active.
3322    pub fn commit(&mut self) -> Result<()> {
3323        self.commit_inner()
3324    }
3325
3326    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3327    fn commit_inner(&self) -> Result<()> {
3328        let _span = grafeo_debug_span!("grafeo::tx::commit");
3329        // Nested transaction: release the auto-savepoint (changes are preserved).
3330        {
3331            let mut depth = self.transaction_nesting_depth.lock();
3332            if *depth > 0 {
3333                let sp_name = format!("_nested_tx_{depth}");
3334                *depth -= 1;
3335                drop(depth);
3336                return self.release_savepoint(&sp_name);
3337            }
3338        }
3339
3340        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3341            grafeo_common::utils::error::Error::Transaction(
3342                grafeo_common::utils::error::TransactionError::InvalidState(
3343                    "No active transaction".to_string(),
3344                ),
3345            )
3346        })?;
3347
3348        // Validate the transaction first (conflict detection) before committing data.
3349        // If this fails, we rollback the data changes instead of making them permanent.
3350        let touched = self.touched_graphs.lock().clone();
3351        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3352            Ok(epoch) => epoch,
3353            Err(e) => {
3354                // Conflict detected: rollback the data changes
3355                for graph_name in &touched {
3356                    let store = self.resolve_store(graph_name);
3357                    store.rollback_transaction_properties(transaction_id);
3358                }
3359                #[cfg(feature = "rdf")]
3360                self.rollback_rdf_transaction(transaction_id);
3361                // Discard buffered CDC events on conflict rollback
3362                #[cfg(feature = "cdc")]
3363                if let Some(ref pending) = self.cdc_pending_events {
3364                    pending.lock().clear();
3365                }
3366                *self.read_only_tx.lock() = self.db_read_only;
3367                self.savepoints.lock().clear();
3368                self.touched_graphs.lock().clear();
3369                #[cfg(feature = "metrics")]
3370                {
3371                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3372                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3373                    #[cfg(not(target_arch = "wasm32"))]
3374                    if let Some(start) = self.tx_start_time.lock().take() {
3375                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3376                        crate::metrics::record_metric!(
3377                            self.metrics,
3378                            tx_duration,
3379                            observe duration_ms
3380                        );
3381                    }
3382                }
3383                return Err(e);
3384            }
3385        };
3386
3387        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3388        for graph_name in &touched {
3389            let store = self.resolve_store(graph_name);
3390            store.finalize_version_epochs(transaction_id, commit_epoch);
3391        }
3392
3393        // Commit succeeded: discard undo logs (make changes permanent)
3394        #[cfg(feature = "rdf")]
3395        self.commit_rdf_transaction(transaction_id);
3396
3397        for graph_name in &touched {
3398            let store = self.resolve_store(graph_name);
3399            store.commit_transaction_properties(transaction_id);
3400        }
3401
3402        // Flush buffered CDC events now that the transaction is committed.
3403        // All buffered events have PENDING epoch; assign the real commit_epoch.
3404        // Uses record_batch to acquire the write lock once per commit.
3405        #[cfg(feature = "cdc")]
3406        if let Some(ref pending) = self.cdc_pending_events {
3407            let events: Vec<crate::cdc::ChangeEvent> = pending.lock().drain(..).collect();
3408            self.cdc_log.record_batch(events.into_iter().map(|mut e| {
3409                e.epoch = commit_epoch;
3410                e
3411            }));
3412        }
3413
3414        // Sync epoch for all touched graphs so that convenience lookups
3415        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3416        let current_epoch = self.transaction_manager.current_epoch();
3417        for graph_name in &touched {
3418            let store = self.resolve_store(graph_name);
3419            store.sync_epoch(current_epoch);
3420        }
3421
3422        // Reset read-only flag, clear savepoints and touched graphs
3423        *self.read_only_tx.lock() = self.db_read_only;
3424        self.savepoints.lock().clear();
3425        self.touched_graphs.lock().clear();
3426
3427        // Auto-GC: periodically prune old MVCC versions
3428        if self.gc_interval > 0 {
3429            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3430            if count.is_multiple_of(self.gc_interval) {
3431                let min_epoch = self.transaction_manager.min_active_epoch();
3432                for graph_name in &touched {
3433                    let store = self.resolve_store(graph_name);
3434                    store.gc_versions(min_epoch);
3435                }
3436                self.transaction_manager.gc();
3437                #[cfg(feature = "metrics")]
3438                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3439            }
3440        }
3441
3442        #[cfg(feature = "metrics")]
3443        {
3444            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3445            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3446            #[cfg(not(target_arch = "wasm32"))]
3447            if let Some(start) = self.tx_start_time.lock().take() {
3448                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3449                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3450            }
3451        }
3452
3453        Ok(())
3454    }
3455
3456    /// Aborts the current transaction.
3457    ///
3458    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3459    ///
3460    /// # Errors
3461    ///
3462    /// Returns an error if no transaction is active.
3463    ///
3464    /// # Examples
3465    ///
3466    /// ```no_run
3467    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3468    /// use grafeo_engine::GrafeoDB;
3469    ///
3470    /// let db = GrafeoDB::new_in_memory();
3471    /// let mut session = db.session();
3472    ///
3473    /// session.begin_transaction()?;
3474    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3475    /// session.rollback()?; // Insert is discarded
3476    /// # Ok(())
3477    /// # }
3478    /// ```
3479    pub fn rollback(&mut self) -> Result<()> {
3480        self.rollback_inner()
3481    }
3482
3483    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3484    fn rollback_inner(&self) -> Result<()> {
3485        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3486        // Nested transaction: rollback to the auto-savepoint.
3487        {
3488            let mut depth = self.transaction_nesting_depth.lock();
3489            if *depth > 0 {
3490                let sp_name = format!("_nested_tx_{depth}");
3491                *depth -= 1;
3492                drop(depth);
3493                return self.rollback_to_savepoint(&sp_name);
3494            }
3495        }
3496
3497        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3498            grafeo_common::utils::error::Error::Transaction(
3499                grafeo_common::utils::error::TransactionError::InvalidState(
3500                    "No active transaction".to_string(),
3501                ),
3502            )
3503        })?;
3504
3505        // Reset read-only flag
3506        *self.read_only_tx.lock() = self.db_read_only;
3507
3508        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3509        let touched = self.touched_graphs.lock().clone();
3510        for graph_name in &touched {
3511            let store = self.resolve_store(graph_name);
3512            store.discard_uncommitted_versions(transaction_id);
3513        }
3514
3515        // Discard pending operations in the RDF store
3516        #[cfg(feature = "rdf")]
3517        self.rollback_rdf_transaction(transaction_id);
3518
3519        // Discard buffered CDC events on rollback
3520        #[cfg(feature = "cdc")]
3521        if let Some(ref pending) = self.cdc_pending_events {
3522            pending.lock().clear();
3523        }
3524
3525        // Clear savepoints and touched graphs
3526        self.savepoints.lock().clear();
3527        self.touched_graphs.lock().clear();
3528
3529        // Mark transaction as aborted in the manager
3530        let result = self.transaction_manager.abort(transaction_id);
3531
3532        #[cfg(feature = "metrics")]
3533        if result.is_ok() {
3534            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3535            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3536            #[cfg(not(target_arch = "wasm32"))]
3537            if let Some(start) = self.tx_start_time.lock().take() {
3538                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3539                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3540            }
3541        }
3542
3543        result
3544    }
3545
3546    /// Creates a named savepoint within the current transaction.
3547    ///
3548    /// The savepoint captures the current node/edge ID counters so that
3549    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3550    /// entities created after this point.
3551    ///
3552    /// # Errors
3553    ///
3554    /// Returns an error if no transaction is active.
3555    pub fn savepoint(&self, name: &str) -> Result<()> {
3556        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3557            grafeo_common::utils::error::Error::Transaction(
3558                grafeo_common::utils::error::TransactionError::InvalidState(
3559                    "No active transaction".to_string(),
3560                ),
3561            )
3562        })?;
3563
3564        // Capture state for every graph touched so far.
3565        let touched = self.touched_graphs.lock().clone();
3566        let graph_snapshots: Vec<GraphSavepoint> = touched
3567            .iter()
3568            .map(|graph_name| {
3569                let store = self.resolve_store(graph_name);
3570                GraphSavepoint {
3571                    graph_name: graph_name.clone(),
3572                    next_node_id: store.peek_next_node_id(),
3573                    next_edge_id: store.peek_next_edge_id(),
3574                    undo_log_position: store.property_undo_log_position(tx_id),
3575                }
3576            })
3577            .collect();
3578
3579        self.savepoints.lock().push(SavepointState {
3580            name: name.to_string(),
3581            graph_snapshots,
3582            active_graph: self.current_graph.lock().clone(),
3583            #[cfg(feature = "cdc")]
3584            cdc_event_position: self
3585                .cdc_pending_events
3586                .as_ref()
3587                .map_or(0, |p| p.lock().len()),
3588        });
3589        Ok(())
3590    }
3591
3592    /// Rolls back to a named savepoint, undoing all writes made after it.
3593    ///
3594    /// The savepoint and any savepoints created after it are removed.
3595    /// Entities with IDs >= the savepoint snapshot are discarded.
3596    ///
3597    /// # Errors
3598    ///
3599    /// Returns an error if no transaction is active or the savepoint does not exist.
3600    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3601        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3602            grafeo_common::utils::error::Error::Transaction(
3603                grafeo_common::utils::error::TransactionError::InvalidState(
3604                    "No active transaction".to_string(),
3605                ),
3606            )
3607        })?;
3608
3609        let mut savepoints = self.savepoints.lock();
3610
3611        // Find the savepoint by name (search from the end for nested savepoints)
3612        let pos = savepoints
3613            .iter()
3614            .rposition(|sp| sp.name == name)
3615            .ok_or_else(|| {
3616                grafeo_common::utils::error::Error::Transaction(
3617                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3618                        "Savepoint '{name}' not found"
3619                    )),
3620                )
3621            })?;
3622
3623        let sp_state = savepoints[pos].clone();
3624
3625        // Remove this savepoint and all later ones
3626        savepoints.truncate(pos);
3627        drop(savepoints);
3628
3629        // Roll back each graph that was captured in the savepoint.
3630        for gs in &sp_state.graph_snapshots {
3631            let store = self.resolve_store(&gs.graph_name);
3632
3633            // Replay property/label undo entries recorded after the savepoint
3634            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3635
3636            // Discard entities created after the savepoint
3637            let current_next_node = store.peek_next_node_id();
3638            let current_next_edge = store.peek_next_edge_id();
3639
3640            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3641                .map(NodeId::new)
3642                .collect();
3643            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3644                .map(EdgeId::new)
3645                .collect();
3646
3647            if !node_ids.is_empty() || !edge_ids.is_empty() {
3648                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3649            }
3650        }
3651
3652        // Also roll back any graphs that were touched AFTER the savepoint
3653        // but not captured in it. These need full discard since the savepoint
3654        // didn't include them.
3655        let touched = self.touched_graphs.lock().clone();
3656        for graph_name in &touched {
3657            let already_captured = sp_state
3658                .graph_snapshots
3659                .iter()
3660                .any(|gs| gs.graph_name == *graph_name);
3661            if !already_captured {
3662                let store = self.resolve_store(graph_name);
3663                store.discard_uncommitted_versions(transaction_id);
3664            }
3665        }
3666
3667        // Truncate CDC event buffer to the savepoint position.
3668        #[cfg(feature = "cdc")]
3669        if let Some(ref pending) = self.cdc_pending_events {
3670            pending.lock().truncate(sp_state.cdc_event_position);
3671        }
3672
3673        // Restore touched_graphs to only the graphs that were known at savepoint time.
3674        let mut touched = self.touched_graphs.lock();
3675        touched.clear();
3676        for gs in &sp_state.graph_snapshots {
3677            if !touched.contains(&gs.graph_name) {
3678                touched.push(gs.graph_name.clone());
3679            }
3680        }
3681
3682        Ok(())
3683    }
3684
3685    /// Releases (removes) a named savepoint without rolling back.
3686    ///
3687    /// # Errors
3688    ///
3689    /// Returns an error if no transaction is active or the savepoint does not exist.
3690    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3691        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3692            grafeo_common::utils::error::Error::Transaction(
3693                grafeo_common::utils::error::TransactionError::InvalidState(
3694                    "No active transaction".to_string(),
3695                ),
3696            )
3697        })?;
3698
3699        let mut savepoints = self.savepoints.lock();
3700        let pos = savepoints
3701            .iter()
3702            .rposition(|sp| sp.name == name)
3703            .ok_or_else(|| {
3704                grafeo_common::utils::error::Error::Transaction(
3705                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3706                        "Savepoint '{name}' not found"
3707                    )),
3708                )
3709            })?;
3710        savepoints.remove(pos);
3711        Ok(())
3712    }
3713
3714    /// Returns whether a transaction is active.
3715    #[must_use]
3716    pub fn in_transaction(&self) -> bool {
3717        self.current_transaction.lock().is_some()
3718    }
3719
3720    /// Returns the current transaction ID, if any.
3721    #[must_use]
3722    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3723        *self.current_transaction.lock()
3724    }
3725
3726    /// Returns a reference to the transaction manager.
3727    #[must_use]
3728    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3729        &self.transaction_manager
3730    }
3731
3732    /// Returns the store's current node count and the count at transaction start.
3733    #[must_use]
3734    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3735        (
3736            self.transaction_start_node_count.load(Ordering::Relaxed),
3737            self.active_lpg_store().node_count(),
3738        )
3739    }
3740
3741    /// Returns the store's current edge count and the count at transaction start.
3742    #[must_use]
3743    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3744        (
3745            self.transaction_start_edge_count.load(Ordering::Relaxed),
3746            self.active_lpg_store().edge_count(),
3747        )
3748    }
3749
3750    /// Prepares the current transaction for a two-phase commit.
3751    ///
3752    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3753    /// lets you inspect pending changes and attach metadata before finalizing.
3754    /// The mutable borrow prevents concurrent operations while the commit is
3755    /// pending.
3756    ///
3757    /// If the `PreparedCommit` is dropped without calling `commit()` or
3758    /// `abort()`, the transaction is automatically rolled back.
3759    ///
3760    /// # Errors
3761    ///
3762    /// Returns an error if no transaction is active.
3763    ///
3764    /// # Examples
3765    ///
3766    /// ```no_run
3767    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3768    /// use grafeo_engine::GrafeoDB;
3769    ///
3770    /// let db = GrafeoDB::new_in_memory();
3771    /// let mut session = db.session();
3772    ///
3773    /// session.begin_transaction()?;
3774    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3775    ///
3776    /// let mut prepared = session.prepare_commit()?;
3777    /// println!("Nodes written: {}", prepared.info().nodes_written);
3778    /// prepared.set_metadata("audit_user", "admin");
3779    /// prepared.commit()?;
3780    /// # Ok(())
3781    /// # }
3782    /// ```
3783    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3784        crate::transaction::PreparedCommit::new(self)
3785    }
3786
3787    /// Sets auto-commit mode.
3788    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3789        self.auto_commit = auto_commit;
3790    }
3791
3792    /// Returns whether auto-commit is enabled.
3793    #[must_use]
3794    pub fn auto_commit(&self) -> bool {
3795        self.auto_commit
3796    }
3797
3798    /// Returns `true` if auto-commit should wrap this execution.
3799    ///
3800    /// Auto-commit kicks in when: the session is in auto-commit mode,
3801    /// no explicit transaction is active, and the query mutates data.
3802    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3803        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3804    }
3805
3806    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3807    /// returns `true`. On error the transaction is rolled back.
3808    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3809    where
3810        F: FnOnce() -> Result<QueryResult>,
3811    {
3812        if self.needs_auto_commit(has_mutations) {
3813            self.begin_transaction_inner(false, None)?;
3814            match body() {
3815                Ok(result) => {
3816                    self.commit_inner()?;
3817                    Ok(result)
3818                }
3819                Err(e) => {
3820                    let _ = self.rollback_inner();
3821                    Err(e)
3822                }
3823            }
3824        } else {
3825            body()
3826        }
3827    }
3828
3829    /// Quick heuristic: returns `true` when the query text looks like it
3830    /// performs a mutation. Used by `_with_params` paths that go through the
3831    /// `QueryProcessor` (where the logical plan isn't available before
3832    /// execution). False negatives are harmless: the data just won't be
3833    /// auto-committed, which matches the prior behaviour.
3834    fn query_looks_like_mutation(query: &str) -> bool {
3835        let upper = query.to_ascii_uppercase();
3836        upper.contains("INSERT")
3837            || upper.contains("CREATE")
3838            || upper.contains("DELETE")
3839            || upper.contains("MERGE")
3840            || upper.contains("SET")
3841            || upper.contains("REMOVE")
3842            || upper.contains("DROP")
3843            || upper.contains("ALTER")
3844    }
3845
3846    /// Computes the wall-clock deadline for query execution.
3847    #[must_use]
3848    fn query_deadline(&self) -> Option<Instant> {
3849        #[cfg(not(target_arch = "wasm32"))]
3850        {
3851            self.query_timeout.map(|d| Instant::now() + d)
3852        }
3853        #[cfg(target_arch = "wasm32")]
3854        {
3855            let _ = &self.query_timeout;
3856            None
3857        }
3858    }
3859
3860    /// Records query metrics for any language.
3861    ///
3862    /// Called after query execution to update counters, latency histogram,
3863    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3864    /// where `Instant` is unavailable.
3865    #[cfg(feature = "metrics")]
3866    fn record_query_metrics(
3867        &self,
3868        language: &str,
3869        elapsed_ms: Option<f64>,
3870        result: &Result<crate::database::QueryResult>,
3871    ) {
3872        use crate::metrics::record_metric;
3873
3874        record_metric!(self.metrics, query_count, inc);
3875        if let Some(ref reg) = self.metrics {
3876            reg.query_count_by_language.increment(language);
3877        }
3878        if let Some(ms) = elapsed_ms {
3879            record_metric!(self.metrics, query_latency, observe ms);
3880        }
3881        match result {
3882            Ok(r) => {
3883                let returned = r.rows.len() as u64;
3884                record_metric!(self.metrics, rows_returned, add returned);
3885                if let Some(scanned) = r.rows_scanned {
3886                    record_metric!(self.metrics, rows_scanned, add scanned);
3887                }
3888            }
3889            Err(e) => {
3890                record_metric!(self.metrics, query_errors, inc);
3891                // Detect timeout errors
3892                let msg = e.to_string();
3893                if msg.contains("exceeded timeout") {
3894                    record_metric!(self.metrics, query_timeouts, inc);
3895                }
3896            }
3897        }
3898    }
3899
3900    /// Evaluates a simple integer literal from a session parameter expression.
3901    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3902        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3903        match expr {
3904            Expression::Literal(Literal::Integer(n)) => Some(*n),
3905            _ => None,
3906        }
3907    }
3908
3909    /// Returns the current transaction context for MVCC visibility.
3910    ///
3911    /// Returns `(viewing_epoch, transaction_id)` where:
3912    /// - `viewing_epoch` is the epoch at which to check version visibility
3913    /// - `transaction_id` is the current transaction ID (if in a transaction)
3914    #[must_use]
3915    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3916        // Time-travel override takes precedence (read-only, no tx context)
3917        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3918            return (epoch, None);
3919        }
3920
3921        if let Some(transaction_id) = *self.current_transaction.lock() {
3922            // In a transaction: use the transaction's start epoch
3923            let epoch = self
3924                .transaction_manager
3925                .start_epoch(transaction_id)
3926                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3927            (epoch, Some(transaction_id))
3928        } else {
3929            // No transaction: use current epoch
3930            (self.transaction_manager.current_epoch(), None)
3931        }
3932    }
3933
3934    /// Creates a planner with transaction context and constraint validator.
3935    ///
3936    /// The `store` parameter is the graph store to plan against (use
3937    /// `self.active_store()` for graph-aware execution).
3938    fn create_planner_for_store(
3939        &self,
3940        store: Arc<dyn GraphStore>,
3941        viewing_epoch: EpochId,
3942        transaction_id: Option<TransactionId>,
3943    ) -> crate::query::Planner {
3944        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3945    }
3946
3947    fn create_planner_for_store_with_read_only(
3948        &self,
3949        store: Arc<dyn GraphStore>,
3950        viewing_epoch: EpochId,
3951        transaction_id: Option<TransactionId>,
3952        read_only: bool,
3953    ) -> crate::query::Planner {
3954        use crate::query::Planner;
3955        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3956
3957        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3958        let info_store = Arc::clone(&store);
3959        let schema_store = Arc::clone(&store);
3960
3961        let session_context = SessionContext {
3962            current_schema: self.current_schema(),
3963            current_graph: self.current_graph(),
3964            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3965            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3966        };
3967
3968        let write_store = self.active_write_store();
3969
3970        let mut planner = Planner::with_context(
3971            Arc::clone(&store),
3972            write_store,
3973            Arc::clone(&self.transaction_manager),
3974            transaction_id,
3975            viewing_epoch,
3976        )
3977        .with_factorized_execution(self.factorized_execution)
3978        .with_catalog(Arc::clone(&self.catalog))
3979        .with_session_context(session_context)
3980        .with_read_only(read_only);
3981
3982        // Attach the constraint validator for schema enforcement
3983        let validator =
3984            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3985        planner = planner.with_validator(Arc::new(validator));
3986
3987        planner
3988    }
3989
3990    /// Builds a `Value::Map` for the `info()` introspection function.
3991    fn build_info_value(store: &dyn GraphStore) -> Value {
3992        use grafeo_common::types::PropertyKey;
3993        use std::collections::BTreeMap;
3994
3995        let mut map = BTreeMap::new();
3996        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3997        map.insert(
3998            PropertyKey::from("node_count"),
3999            Value::Int64(store.node_count() as i64),
4000        );
4001        map.insert(
4002            PropertyKey::from("edge_count"),
4003            Value::Int64(store.edge_count() as i64),
4004        );
4005        map.insert(
4006            PropertyKey::from("version"),
4007            Value::String(env!("CARGO_PKG_VERSION").into()),
4008        );
4009        Value::Map(map.into())
4010    }
4011
4012    /// Builds a `Value::Map` for the `schema()` introspection function.
4013    fn build_schema_value(store: &dyn GraphStore) -> Value {
4014        use grafeo_common::types::PropertyKey;
4015        use std::collections::BTreeMap;
4016
4017        let labels: Vec<Value> = store
4018            .all_labels()
4019            .into_iter()
4020            .map(|l| Value::String(l.into()))
4021            .collect();
4022        let edge_types: Vec<Value> = store
4023            .all_edge_types()
4024            .into_iter()
4025            .map(|t| Value::String(t.into()))
4026            .collect();
4027        let property_keys: Vec<Value> = store
4028            .all_property_keys()
4029            .into_iter()
4030            .map(|k| Value::String(k.into()))
4031            .collect();
4032
4033        let mut map = BTreeMap::new();
4034        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
4035        map.insert(
4036            PropertyKey::from("edge_types"),
4037            Value::List(edge_types.into()),
4038        );
4039        map.insert(
4040            PropertyKey::from("property_keys"),
4041            Value::List(property_keys.into()),
4042        );
4043        Value::Map(map.into())
4044    }
4045
4046    /// Creates a node directly (bypassing query execution).
4047    ///
4048    /// This is a low-level API for testing and direct manipulation.
4049    /// If a transaction is active, the node will be versioned with the transaction ID.
4050    pub fn create_node(&self, labels: &[&str]) -> NodeId {
4051        let (epoch, transaction_id) = self.get_transaction_context();
4052        self.active_lpg_store().create_node_versioned(
4053            labels,
4054            epoch,
4055            transaction_id.unwrap_or(TransactionId::SYSTEM),
4056        )
4057    }
4058
4059    /// Creates a node with properties.
4060    ///
4061    /// If a transaction is active, the node will be versioned with the transaction ID.
4062    pub fn create_node_with_props<'a>(
4063        &self,
4064        labels: &[&str],
4065        properties: impl IntoIterator<Item = (&'a str, Value)>,
4066    ) -> NodeId {
4067        let (epoch, transaction_id) = self.get_transaction_context();
4068        self.active_lpg_store().create_node_with_props_versioned(
4069            labels,
4070            properties,
4071            epoch,
4072            transaction_id.unwrap_or(TransactionId::SYSTEM),
4073        )
4074    }
4075
4076    /// Creates an edge between two nodes.
4077    ///
4078    /// This is a low-level API for testing and direct manipulation.
4079    /// If a transaction is active, the edge will be versioned with the transaction ID.
4080    pub fn create_edge(
4081        &self,
4082        src: NodeId,
4083        dst: NodeId,
4084        edge_type: &str,
4085    ) -> grafeo_common::types::EdgeId {
4086        let (epoch, transaction_id) = self.get_transaction_context();
4087        self.active_lpg_store().create_edge_versioned(
4088            src,
4089            dst,
4090            edge_type,
4091            epoch,
4092            transaction_id.unwrap_or(TransactionId::SYSTEM),
4093        )
4094    }
4095
4096    /// Creates an edge with properties within the active transaction context.
4097    pub fn create_edge_with_props<'a>(
4098        &self,
4099        src: NodeId,
4100        dst: NodeId,
4101        edge_type: &str,
4102        properties: impl IntoIterator<Item = (&'a str, Value)>,
4103    ) -> grafeo_common::types::EdgeId {
4104        let (epoch, transaction_id) = self.get_transaction_context();
4105        let tid = transaction_id.unwrap_or(TransactionId::SYSTEM);
4106        let store = self.active_lpg_store();
4107        let eid = store.create_edge_versioned(src, dst, edge_type, epoch, tid);
4108        for (key, value) in properties {
4109            store.set_edge_property_versioned(eid, key, value, tid);
4110        }
4111        eid
4112    }
4113
4114    /// Sets a node property within the active transaction context.
4115    pub fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
4116        let (_, transaction_id) = self.get_transaction_context();
4117        if let Some(tid) = transaction_id {
4118            self.active_lpg_store()
4119                .set_node_property_versioned(id, key, value, tid);
4120        } else {
4121            self.active_lpg_store().set_node_property(id, key, value);
4122        }
4123    }
4124
4125    /// Sets an edge property within the active transaction context.
4126    pub fn set_edge_property(&self, id: grafeo_common::types::EdgeId, key: &str, value: Value) {
4127        let (_, transaction_id) = self.get_transaction_context();
4128        if let Some(tid) = transaction_id {
4129            self.active_lpg_store()
4130                .set_edge_property_versioned(id, key, value, tid);
4131        } else {
4132            self.active_lpg_store().set_edge_property(id, key, value);
4133        }
4134    }
4135
4136    /// Deletes a node within the active transaction context.
4137    pub fn delete_node(&self, id: NodeId) -> bool {
4138        let (epoch, transaction_id) = self.get_transaction_context();
4139        if let Some(tid) = transaction_id {
4140            self.active_lpg_store()
4141                .delete_node_versioned(id, epoch, tid)
4142        } else {
4143            self.active_lpg_store().delete_node(id)
4144        }
4145    }
4146
4147    /// Deletes an edge within the active transaction context.
4148    pub fn delete_edge(&self, id: grafeo_common::types::EdgeId) -> bool {
4149        let (epoch, transaction_id) = self.get_transaction_context();
4150        if let Some(tid) = transaction_id {
4151            self.active_lpg_store()
4152                .delete_edge_versioned(id, epoch, tid)
4153        } else {
4154            self.active_lpg_store().delete_edge(id)
4155        }
4156    }
4157
4158    // =========================================================================
4159    // Direct Lookup APIs (bypass query planning for O(1) point reads)
4160    // =========================================================================
4161
4162    /// Gets a node by ID directly, bypassing query planning.
4163    ///
4164    /// This is the fastest way to retrieve a single node when you know its ID.
4165    /// Skips parsing, binding, optimization, and physical planning entirely.
4166    ///
4167    /// # Performance
4168    ///
4169    /// - Time complexity: O(1) average case
4170    /// - No lock contention (uses DashMap internally)
4171    /// - ~20-30x faster than equivalent MATCH query
4172    ///
4173    /// # Example
4174    ///
4175    /// ```no_run
4176    /// # use grafeo_engine::GrafeoDB;
4177    /// # let db = GrafeoDB::new_in_memory();
4178    /// let session = db.session();
4179    /// let node_id = session.create_node(&["Person"]);
4180    ///
4181    /// // Direct lookup - O(1), no query planning
4182    /// let node = session.get_node(node_id);
4183    /// assert!(node.is_some());
4184    /// ```
4185    #[must_use]
4186    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4187        let (epoch, transaction_id) = self.get_transaction_context();
4188        self.active_lpg_store().get_node_versioned(
4189            id,
4190            epoch,
4191            transaction_id.unwrap_or(TransactionId::SYSTEM),
4192        )
4193    }
4194
4195    /// Gets a single property from a node by ID, bypassing query planning.
4196    ///
4197    /// More efficient than `get_node()` when you only need one property,
4198    /// as it avoids loading the full node with all properties.
4199    ///
4200    /// # Performance
4201    ///
4202    /// - Time complexity: O(1) average case
4203    /// - No query planning overhead
4204    ///
4205    /// # Example
4206    ///
4207    /// ```no_run
4208    /// # use grafeo_engine::GrafeoDB;
4209    /// # use grafeo_common::types::Value;
4210    /// # let db = GrafeoDB::new_in_memory();
4211    /// let session = db.session();
4212    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4213    ///
4214    /// // Direct property access - O(1)
4215    /// let name = session.get_node_property(id, "name");
4216    /// assert_eq!(name, Some(Value::String("Alix".into())));
4217    /// ```
4218    #[must_use]
4219    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4220        self.get_node(id)
4221            .and_then(|node| node.get_property(key).cloned())
4222    }
4223
4224    /// Gets an edge by ID directly, bypassing query planning.
4225    ///
4226    /// # Performance
4227    ///
4228    /// - Time complexity: O(1) average case
4229    /// - No lock contention
4230    #[must_use]
4231    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4232        let (epoch, transaction_id) = self.get_transaction_context();
4233        self.active_lpg_store().get_edge_versioned(
4234            id,
4235            epoch,
4236            transaction_id.unwrap_or(TransactionId::SYSTEM),
4237        )
4238    }
4239
4240    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4241    ///
4242    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4243    ///
4244    /// # Performance
4245    ///
4246    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4247    /// - Uses adjacency index for direct access
4248    /// - ~10-20x faster than equivalent MATCH query
4249    ///
4250    /// # Example
4251    ///
4252    /// ```no_run
4253    /// # use grafeo_engine::GrafeoDB;
4254    /// # let db = GrafeoDB::new_in_memory();
4255    /// let session = db.session();
4256    /// let alix = session.create_node(&["Person"]);
4257    /// let gus = session.create_node(&["Person"]);
4258    /// session.create_edge(alix, gus, "KNOWS");
4259    ///
4260    /// // Direct neighbor lookup - O(degree)
4261    /// let neighbors = session.get_neighbors_outgoing(alix);
4262    /// assert_eq!(neighbors.len(), 1);
4263    /// assert_eq!(neighbors[0].0, gus);
4264    /// ```
4265    #[must_use]
4266    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4267        self.active_lpg_store()
4268            .edges_from(node, Direction::Outgoing)
4269            .collect()
4270    }
4271
4272    /// Gets incoming neighbors of a node directly, bypassing query planning.
4273    ///
4274    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4275    ///
4276    /// # Performance
4277    ///
4278    /// - Time complexity: O(degree) where degree is the number of incoming edges
4279    /// - Uses backward adjacency index for direct access
4280    #[must_use]
4281    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4282        self.active_lpg_store()
4283            .edges_from(node, Direction::Incoming)
4284            .collect()
4285    }
4286
4287    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4288    ///
4289    /// # Example
4290    ///
4291    /// ```no_run
4292    /// # use grafeo_engine::GrafeoDB;
4293    /// # let db = GrafeoDB::new_in_memory();
4294    /// # let session = db.session();
4295    /// # let alix = session.create_node(&["Person"]);
4296    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4297    /// ```
4298    #[must_use]
4299    pub fn get_neighbors_outgoing_by_type(
4300        &self,
4301        node: NodeId,
4302        edge_type: &str,
4303    ) -> Vec<(NodeId, EdgeId)> {
4304        self.active_lpg_store()
4305            .edges_from(node, Direction::Outgoing)
4306            .filter(|(_, edge_id)| {
4307                self.get_edge(*edge_id)
4308                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4309            })
4310            .collect()
4311    }
4312
4313    /// Checks if a node exists, bypassing query planning.
4314    ///
4315    /// # Performance
4316    ///
4317    /// - Time complexity: O(1)
4318    /// - Fastest existence check available
4319    #[must_use]
4320    pub fn node_exists(&self, id: NodeId) -> bool {
4321        self.get_node(id).is_some()
4322    }
4323
4324    /// Checks if an edge exists, bypassing query planning.
4325    #[must_use]
4326    pub fn edge_exists(&self, id: EdgeId) -> bool {
4327        self.get_edge(id).is_some()
4328    }
4329
4330    /// Gets the degree (number of edges) of a node.
4331    ///
4332    /// Returns (outgoing_degree, incoming_degree).
4333    #[must_use]
4334    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4335        let active = self.active_lpg_store();
4336        let out = active.out_degree(node);
4337        let in_degree = active.in_degree(node);
4338        (out, in_degree)
4339    }
4340
4341    /// Batch lookup of multiple nodes by ID.
4342    ///
4343    /// More efficient than calling `get_node()` in a loop because it
4344    /// amortizes overhead.
4345    ///
4346    /// # Performance
4347    ///
4348    /// - Time complexity: O(n) where n is the number of IDs
4349    /// - Better cache utilization than individual lookups
4350    #[must_use]
4351    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4352        let (epoch, transaction_id) = self.get_transaction_context();
4353        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4354        let active = self.active_lpg_store();
4355        ids.iter()
4356            .map(|&id| active.get_node_versioned(id, epoch, tx))
4357            .collect()
4358    }
4359
4360    // ── Change Data Capture ─────────────────────────────────────────────
4361
4362    /// Returns the full change history for an entity (node or edge).
4363    ///
4364    /// # Errors
4365    ///
4366    /// Currently infallible, but returns `Result` for forward compatibility.
4367    #[cfg(feature = "cdc")]
4368    pub fn history(
4369        &self,
4370        entity_id: impl Into<crate::cdc::EntityId>,
4371    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4372        Ok(self.cdc_log.history(entity_id.into()))
4373    }
4374
4375    /// Returns change events for an entity since the given epoch.
4376    ///
4377    /// # Errors
4378    ///
4379    /// Currently infallible, but returns `Result` for forward compatibility.
4380    #[cfg(feature = "cdc")]
4381    pub fn history_since(
4382        &self,
4383        entity_id: impl Into<crate::cdc::EntityId>,
4384        since_epoch: EpochId,
4385    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4386        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4387    }
4388
4389    /// Returns all change events across all entities in an epoch range.
4390    ///
4391    /// # Errors
4392    ///
4393    /// Currently infallible, but returns `Result` for forward compatibility.
4394    #[cfg(feature = "cdc")]
4395    pub fn changes_between(
4396        &self,
4397        start_epoch: EpochId,
4398        end_epoch: EpochId,
4399    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4400        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4401    }
4402}
4403
4404impl Drop for Session {
4405    fn drop(&mut self) {
4406        // Auto-rollback any active transaction to prevent leaked MVCC state,
4407        // dangling write locks, and uncommitted versions lingering in the store.
4408        if self.in_transaction() {
4409            let _ = self.rollback_inner();
4410        }
4411
4412        #[cfg(feature = "metrics")]
4413        if let Some(ref reg) = self.metrics {
4414            reg.session_active
4415                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4416        }
4417    }
4418}
4419
4420#[cfg(test)]
4421mod tests {
4422    use super::parse_default_literal;
4423    use crate::database::GrafeoDB;
4424    use grafeo_common::types::Value;
4425
4426    // -----------------------------------------------------------------------
4427    // parse_default_literal
4428    // -----------------------------------------------------------------------
4429
4430    #[test]
4431    fn parse_default_literal_null() {
4432        assert_eq!(parse_default_literal("null"), Value::Null);
4433        assert_eq!(parse_default_literal("NULL"), Value::Null);
4434        assert_eq!(parse_default_literal("Null"), Value::Null);
4435    }
4436
4437    #[test]
4438    fn parse_default_literal_bool() {
4439        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4440        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4441        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4442        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4443    }
4444
4445    #[test]
4446    fn parse_default_literal_string_single_quoted() {
4447        assert_eq!(
4448            parse_default_literal("'hello'"),
4449            Value::String("hello".into())
4450        );
4451    }
4452
4453    #[test]
4454    fn parse_default_literal_string_double_quoted() {
4455        assert_eq!(
4456            parse_default_literal("\"world\""),
4457            Value::String("world".into())
4458        );
4459    }
4460
4461    #[test]
4462    fn parse_default_literal_integer() {
4463        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4464        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4465        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4466    }
4467
4468    #[test]
4469    fn parse_default_literal_float() {
4470        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4471        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4472    }
4473
4474    #[test]
4475    fn parse_default_literal_fallback_string() {
4476        // Not a recognized literal, not quoted, not a number
4477        assert_eq!(
4478            parse_default_literal("some_identifier"),
4479            Value::String("some_identifier".into())
4480        );
4481    }
4482
4483    #[test]
4484    fn test_session_create_node() {
4485        let db = GrafeoDB::new_in_memory();
4486        let session = db.session();
4487
4488        let id = session.create_node(&["Person"]);
4489        assert!(id.is_valid());
4490        assert_eq!(db.node_count(), 1);
4491    }
4492
4493    #[test]
4494    fn test_session_transaction() {
4495        let db = GrafeoDB::new_in_memory();
4496        let mut session = db.session();
4497
4498        assert!(!session.in_transaction());
4499
4500        session.begin_transaction().unwrap();
4501        assert!(session.in_transaction());
4502
4503        session.commit().unwrap();
4504        assert!(!session.in_transaction());
4505    }
4506
4507    #[test]
4508    fn test_session_transaction_context() {
4509        let db = GrafeoDB::new_in_memory();
4510        let mut session = db.session();
4511
4512        // Without transaction - context should have current epoch and no transaction_id
4513        let (_epoch1, transaction_id1) = session.get_transaction_context();
4514        assert!(transaction_id1.is_none());
4515
4516        // Start a transaction
4517        session.begin_transaction().unwrap();
4518        let (epoch2, transaction_id2) = session.get_transaction_context();
4519        assert!(transaction_id2.is_some());
4520        // Transaction should have a valid epoch
4521        let _ = epoch2; // Use the variable
4522
4523        // Commit and verify
4524        session.commit().unwrap();
4525        let (epoch3, tx_id3) = session.get_transaction_context();
4526        assert!(tx_id3.is_none());
4527        // Epoch should have advanced after commit
4528        assert!(epoch3.as_u64() >= epoch2.as_u64());
4529    }
4530
4531    #[test]
4532    fn test_session_rollback() {
4533        let db = GrafeoDB::new_in_memory();
4534        let mut session = db.session();
4535
4536        session.begin_transaction().unwrap();
4537        session.rollback().unwrap();
4538        assert!(!session.in_transaction());
4539    }
4540
4541    #[test]
4542    fn test_session_rollback_discards_versions() {
4543        use grafeo_common::types::TransactionId;
4544
4545        let db = GrafeoDB::new_in_memory();
4546
4547        // Create a node outside of any transaction (at system level)
4548        let node_before = db.store().create_node(&["Person"]);
4549        assert!(node_before.is_valid());
4550        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4551
4552        // Start a transaction
4553        let mut session = db.session();
4554        session.begin_transaction().unwrap();
4555        let transaction_id = session.current_transaction.lock().unwrap();
4556
4557        // Create a node versioned with the transaction's ID
4558        let epoch = db.store().current_epoch();
4559        let node_in_tx = db
4560            .store()
4561            .create_node_versioned(&["Person"], epoch, transaction_id);
4562        assert!(node_in_tx.is_valid());
4563
4564        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4565        // non-versioned lookups like node_count(). Verify the node is visible
4566        // only through the owning transaction.
4567        assert_eq!(
4568            db.node_count(),
4569            1,
4570            "PENDING nodes should be invisible to non-versioned node_count()"
4571        );
4572        assert!(
4573            db.store()
4574                .get_node_versioned(node_in_tx, epoch, transaction_id)
4575                .is_some(),
4576            "Transaction node should be visible to its own transaction"
4577        );
4578
4579        // Rollback the transaction
4580        session.rollback().unwrap();
4581        assert!(!session.in_transaction());
4582
4583        // The node created in the transaction should be discarded
4584        // Only the first node should remain visible
4585        let count_after = db.node_count();
4586        assert_eq!(
4587            count_after, 1,
4588            "Rollback should discard uncommitted node, but got {count_after}"
4589        );
4590
4591        // The original node should still be accessible
4592        let current_epoch = db.store().current_epoch();
4593        assert!(
4594            db.store()
4595                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4596                .is_some(),
4597            "Original node should still exist"
4598        );
4599
4600        // The node created in the transaction should not be accessible
4601        assert!(
4602            db.store()
4603                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4604                .is_none(),
4605            "Transaction node should be gone"
4606        );
4607    }
4608
4609    #[test]
4610    fn test_session_create_node_in_transaction() {
4611        // Test that session.create_node() is transaction-aware
4612        let db = GrafeoDB::new_in_memory();
4613
4614        // Create a node outside of any transaction
4615        let node_before = db.create_node(&["Person"]);
4616        assert!(node_before.is_valid());
4617        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4618
4619        // Start a transaction and create a node through the session
4620        let mut session = db.session();
4621        session.begin_transaction().unwrap();
4622        let transaction_id = session.current_transaction.lock().unwrap();
4623
4624        // Create a node through session.create_node() - should be versioned with tx
4625        let node_in_tx = session.create_node(&["Person"]);
4626        assert!(node_in_tx.is_valid());
4627
4628        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4629        // non-versioned lookups. Verify the node is visible only to its own tx.
4630        assert_eq!(
4631            db.node_count(),
4632            1,
4633            "PENDING nodes should be invisible to non-versioned node_count()"
4634        );
4635        let epoch = db.store().current_epoch();
4636        assert!(
4637            db.store()
4638                .get_node_versioned(node_in_tx, epoch, transaction_id)
4639                .is_some(),
4640            "Transaction node should be visible to its own transaction"
4641        );
4642
4643        // Rollback the transaction
4644        session.rollback().unwrap();
4645
4646        // The node created via session.create_node() should be discarded
4647        let count_after = db.node_count();
4648        assert_eq!(
4649            count_after, 1,
4650            "Rollback should discard node created via session.create_node(), but got {count_after}"
4651        );
4652    }
4653
4654    #[test]
4655    fn test_session_create_node_with_props_in_transaction() {
4656        use grafeo_common::types::Value;
4657
4658        // Test that session.create_node_with_props() is transaction-aware
4659        let db = GrafeoDB::new_in_memory();
4660
4661        // Create a node outside of any transaction
4662        db.create_node(&["Person"]);
4663        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4664
4665        // Start a transaction and create a node with properties
4666        let mut session = db.session();
4667        session.begin_transaction().unwrap();
4668        let transaction_id = session.current_transaction.lock().unwrap();
4669
4670        let node_in_tx =
4671            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4672        assert!(node_in_tx.is_valid());
4673
4674        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4675        // non-versioned lookups. Verify the node is visible only to its own tx.
4676        assert_eq!(
4677            db.node_count(),
4678            1,
4679            "PENDING nodes should be invisible to non-versioned node_count()"
4680        );
4681        let epoch = db.store().current_epoch();
4682        assert!(
4683            db.store()
4684                .get_node_versioned(node_in_tx, epoch, transaction_id)
4685                .is_some(),
4686            "Transaction node should be visible to its own transaction"
4687        );
4688
4689        // Rollback the transaction
4690        session.rollback().unwrap();
4691
4692        // The node should be discarded
4693        let count_after = db.node_count();
4694        assert_eq!(
4695            count_after, 1,
4696            "Rollback should discard node created via session.create_node_with_props()"
4697        );
4698    }
4699
4700    #[cfg(feature = "gql")]
4701    mod gql_tests {
4702        use super::*;
4703
4704        #[test]
4705        fn test_gql_query_execution() {
4706            let db = GrafeoDB::new_in_memory();
4707            let session = db.session();
4708
4709            // Create some test data
4710            session.create_node(&["Person"]);
4711            session.create_node(&["Person"]);
4712            session.create_node(&["Animal"]);
4713
4714            // Execute a GQL query
4715            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4716
4717            // Should return 2 Person nodes
4718            assert_eq!(result.row_count(), 2);
4719            assert_eq!(result.column_count(), 1);
4720            assert_eq!(result.columns[0], "n");
4721        }
4722
4723        #[test]
4724        fn test_gql_empty_result() {
4725            let db = GrafeoDB::new_in_memory();
4726            let session = db.session();
4727
4728            // No data in database
4729            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4730
4731            assert_eq!(result.row_count(), 0);
4732        }
4733
4734        #[test]
4735        fn test_gql_parse_error() {
4736            let db = GrafeoDB::new_in_memory();
4737            let session = db.session();
4738
4739            // Invalid GQL syntax
4740            let result = session.execute("MATCH (n RETURN n");
4741
4742            assert!(result.is_err());
4743        }
4744
4745        #[test]
4746        fn test_gql_relationship_traversal() {
4747            let db = GrafeoDB::new_in_memory();
4748            let session = db.session();
4749
4750            // Create a graph: Alix -> Gus, Alix -> Vincent
4751            let alix = session.create_node(&["Person"]);
4752            let gus = session.create_node(&["Person"]);
4753            let vincent = session.create_node(&["Person"]);
4754
4755            session.create_edge(alix, gus, "KNOWS");
4756            session.create_edge(alix, vincent, "KNOWS");
4757
4758            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4759            let result = session
4760                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4761                .unwrap();
4762
4763            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4764            assert_eq!(result.row_count(), 2);
4765            assert_eq!(result.column_count(), 2);
4766            assert_eq!(result.columns[0], "a");
4767            assert_eq!(result.columns[1], "b");
4768        }
4769
4770        #[test]
4771        fn test_gql_relationship_with_type_filter() {
4772            let db = GrafeoDB::new_in_memory();
4773            let session = db.session();
4774
4775            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4776            let alix = session.create_node(&["Person"]);
4777            let gus = session.create_node(&["Person"]);
4778            let vincent = session.create_node(&["Person"]);
4779
4780            session.create_edge(alix, gus, "KNOWS");
4781            session.create_edge(alix, vincent, "WORKS_WITH");
4782
4783            // Query only KNOWS relationships
4784            let result = session
4785                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4786                .unwrap();
4787
4788            // Should return only 1 row (Alix->Gus)
4789            assert_eq!(result.row_count(), 1);
4790        }
4791
4792        #[test]
4793        fn test_gql_semantic_error_undefined_variable() {
4794            let db = GrafeoDB::new_in_memory();
4795            let session = db.session();
4796
4797            // Reference undefined variable 'x' in RETURN
4798            let result = session.execute("MATCH (n:Person) RETURN x");
4799
4800            // Should fail with semantic error
4801            assert!(result.is_err());
4802            let Err(err) = result else {
4803                panic!("Expected error")
4804            };
4805            assert!(
4806                err.to_string().contains("Undefined variable"),
4807                "Expected undefined variable error, got: {}",
4808                err
4809            );
4810        }
4811
4812        #[test]
4813        fn test_gql_where_clause_property_filter() {
4814            use grafeo_common::types::Value;
4815
4816            let db = GrafeoDB::new_in_memory();
4817            let session = db.session();
4818
4819            // Create people with ages
4820            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4821            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4822            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4823
4824            // Query with WHERE clause: age > 30
4825            let result = session
4826                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4827                .unwrap();
4828
4829            // Should return 2 people (ages 35 and 45)
4830            assert_eq!(result.row_count(), 2);
4831        }
4832
4833        #[test]
4834        fn test_gql_where_clause_equality() {
4835            use grafeo_common::types::Value;
4836
4837            let db = GrafeoDB::new_in_memory();
4838            let session = db.session();
4839
4840            // Create people with names
4841            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4842            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4843            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4844
4845            // Query with WHERE clause: name = "Alix"
4846            let result = session
4847                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4848                .unwrap();
4849
4850            // Should return 2 people named Alix
4851            assert_eq!(result.row_count(), 2);
4852        }
4853
4854        #[test]
4855        fn test_gql_return_property_access() {
4856            use grafeo_common::types::Value;
4857
4858            let db = GrafeoDB::new_in_memory();
4859            let session = db.session();
4860
4861            // Create people with names and ages
4862            session.create_node_with_props(
4863                &["Person"],
4864                [
4865                    ("name", Value::String("Alix".into())),
4866                    ("age", Value::Int64(30)),
4867                ],
4868            );
4869            session.create_node_with_props(
4870                &["Person"],
4871                [
4872                    ("name", Value::String("Gus".into())),
4873                    ("age", Value::Int64(25)),
4874                ],
4875            );
4876
4877            // Query returning properties
4878            let result = session
4879                .execute("MATCH (n:Person) RETURN n.name, n.age")
4880                .unwrap();
4881
4882            // Should return 2 rows with name and age columns
4883            assert_eq!(result.row_count(), 2);
4884            assert_eq!(result.column_count(), 2);
4885            assert_eq!(result.columns[0], "n.name");
4886            assert_eq!(result.columns[1], "n.age");
4887
4888            // Check that we get actual values
4889            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4890            assert!(names.contains(&&Value::String("Alix".into())));
4891            assert!(names.contains(&&Value::String("Gus".into())));
4892        }
4893
4894        #[test]
4895        fn test_gql_return_mixed_expressions() {
4896            use grafeo_common::types::Value;
4897
4898            let db = GrafeoDB::new_in_memory();
4899            let session = db.session();
4900
4901            // Create a person
4902            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4903
4904            // Query returning both node and property
4905            let result = session
4906                .execute("MATCH (n:Person) RETURN n, n.name")
4907                .unwrap();
4908
4909            assert_eq!(result.row_count(), 1);
4910            assert_eq!(result.column_count(), 2);
4911            assert_eq!(result.columns[0], "n");
4912            assert_eq!(result.columns[1], "n.name");
4913
4914            // Second column should be the name
4915            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4916        }
4917    }
4918
4919    #[cfg(feature = "cypher")]
4920    mod cypher_tests {
4921        use super::*;
4922
4923        #[test]
4924        fn test_cypher_query_execution() {
4925            let db = GrafeoDB::new_in_memory();
4926            let session = db.session();
4927
4928            // Create some test data
4929            session.create_node(&["Person"]);
4930            session.create_node(&["Person"]);
4931            session.create_node(&["Animal"]);
4932
4933            // Execute a Cypher query
4934            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4935
4936            // Should return 2 Person nodes
4937            assert_eq!(result.row_count(), 2);
4938            assert_eq!(result.column_count(), 1);
4939            assert_eq!(result.columns[0], "n");
4940        }
4941
4942        #[test]
4943        fn test_cypher_empty_result() {
4944            let db = GrafeoDB::new_in_memory();
4945            let session = db.session();
4946
4947            // No data in database
4948            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4949
4950            assert_eq!(result.row_count(), 0);
4951        }
4952
4953        #[test]
4954        fn test_cypher_parse_error() {
4955            let db = GrafeoDB::new_in_memory();
4956            let session = db.session();
4957
4958            // Invalid Cypher syntax
4959            let result = session.execute_cypher("MATCH (n RETURN n");
4960
4961            assert!(result.is_err());
4962        }
4963    }
4964
4965    // ==================== Direct Lookup API Tests ====================
4966
4967    mod direct_lookup_tests {
4968        use super::*;
4969        use grafeo_common::types::Value;
4970
4971        #[test]
4972        fn test_get_node() {
4973            let db = GrafeoDB::new_in_memory();
4974            let session = db.session();
4975
4976            let id = session.create_node(&["Person"]);
4977            let node = session.get_node(id);
4978
4979            assert!(node.is_some());
4980            let node = node.unwrap();
4981            assert_eq!(node.id, id);
4982        }
4983
4984        #[test]
4985        fn test_get_node_not_found() {
4986            use grafeo_common::types::NodeId;
4987
4988            let db = GrafeoDB::new_in_memory();
4989            let session = db.session();
4990
4991            // Try to get a non-existent node
4992            let node = session.get_node(NodeId::new(9999));
4993            assert!(node.is_none());
4994        }
4995
4996        #[test]
4997        fn test_get_node_property() {
4998            let db = GrafeoDB::new_in_memory();
4999            let session = db.session();
5000
5001            let id = session
5002                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
5003
5004            let name = session.get_node_property(id, "name");
5005            assert_eq!(name, Some(Value::String("Alix".into())));
5006
5007            // Non-existent property
5008            let missing = session.get_node_property(id, "missing");
5009            assert!(missing.is_none());
5010        }
5011
5012        #[test]
5013        fn test_get_edge() {
5014            let db = GrafeoDB::new_in_memory();
5015            let session = db.session();
5016
5017            let alix = session.create_node(&["Person"]);
5018            let gus = session.create_node(&["Person"]);
5019            let edge_id = session.create_edge(alix, gus, "KNOWS");
5020
5021            let edge = session.get_edge(edge_id);
5022            assert!(edge.is_some());
5023            let edge = edge.unwrap();
5024            assert_eq!(edge.id, edge_id);
5025            assert_eq!(edge.src, alix);
5026            assert_eq!(edge.dst, gus);
5027        }
5028
5029        #[test]
5030        fn test_get_edge_not_found() {
5031            use grafeo_common::types::EdgeId;
5032
5033            let db = GrafeoDB::new_in_memory();
5034            let session = db.session();
5035
5036            let edge = session.get_edge(EdgeId::new(9999));
5037            assert!(edge.is_none());
5038        }
5039
5040        #[test]
5041        fn test_get_neighbors_outgoing() {
5042            let db = GrafeoDB::new_in_memory();
5043            let session = db.session();
5044
5045            let alix = session.create_node(&["Person"]);
5046            let gus = session.create_node(&["Person"]);
5047            let harm = session.create_node(&["Person"]);
5048
5049            session.create_edge(alix, gus, "KNOWS");
5050            session.create_edge(alix, harm, "KNOWS");
5051
5052            let neighbors = session.get_neighbors_outgoing(alix);
5053            assert_eq!(neighbors.len(), 2);
5054
5055            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5056            assert!(neighbor_ids.contains(&gus));
5057            assert!(neighbor_ids.contains(&harm));
5058        }
5059
5060        #[test]
5061        fn test_get_neighbors_incoming() {
5062            let db = GrafeoDB::new_in_memory();
5063            let session = db.session();
5064
5065            let alix = session.create_node(&["Person"]);
5066            let gus = session.create_node(&["Person"]);
5067            let harm = session.create_node(&["Person"]);
5068
5069            session.create_edge(gus, alix, "KNOWS");
5070            session.create_edge(harm, alix, "KNOWS");
5071
5072            let neighbors = session.get_neighbors_incoming(alix);
5073            assert_eq!(neighbors.len(), 2);
5074
5075            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
5076            assert!(neighbor_ids.contains(&gus));
5077            assert!(neighbor_ids.contains(&harm));
5078        }
5079
5080        #[test]
5081        fn test_get_neighbors_outgoing_by_type() {
5082            let db = GrafeoDB::new_in_memory();
5083            let session = db.session();
5084
5085            let alix = session.create_node(&["Person"]);
5086            let gus = session.create_node(&["Person"]);
5087            let company = session.create_node(&["Company"]);
5088
5089            session.create_edge(alix, gus, "KNOWS");
5090            session.create_edge(alix, company, "WORKS_AT");
5091
5092            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
5093            assert_eq!(knows_neighbors.len(), 1);
5094            assert_eq!(knows_neighbors[0].0, gus);
5095
5096            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
5097            assert_eq!(works_neighbors.len(), 1);
5098            assert_eq!(works_neighbors[0].0, company);
5099
5100            // No edges of this type
5101            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
5102            assert!(no_neighbors.is_empty());
5103        }
5104
5105        #[test]
5106        fn test_node_exists() {
5107            use grafeo_common::types::NodeId;
5108
5109            let db = GrafeoDB::new_in_memory();
5110            let session = db.session();
5111
5112            let id = session.create_node(&["Person"]);
5113
5114            assert!(session.node_exists(id));
5115            assert!(!session.node_exists(NodeId::new(9999)));
5116        }
5117
5118        #[test]
5119        fn test_edge_exists() {
5120            use grafeo_common::types::EdgeId;
5121
5122            let db = GrafeoDB::new_in_memory();
5123            let session = db.session();
5124
5125            let alix = session.create_node(&["Person"]);
5126            let gus = session.create_node(&["Person"]);
5127            let edge_id = session.create_edge(alix, gus, "KNOWS");
5128
5129            assert!(session.edge_exists(edge_id));
5130            assert!(!session.edge_exists(EdgeId::new(9999)));
5131        }
5132
5133        #[test]
5134        fn test_get_degree() {
5135            let db = GrafeoDB::new_in_memory();
5136            let session = db.session();
5137
5138            let alix = session.create_node(&["Person"]);
5139            let gus = session.create_node(&["Person"]);
5140            let harm = session.create_node(&["Person"]);
5141
5142            // Alix knows Gus and Harm (2 outgoing)
5143            session.create_edge(alix, gus, "KNOWS");
5144            session.create_edge(alix, harm, "KNOWS");
5145            // Gus knows Alix (1 incoming for Alix)
5146            session.create_edge(gus, alix, "KNOWS");
5147
5148            let (out_degree, in_degree) = session.get_degree(alix);
5149            assert_eq!(out_degree, 2);
5150            assert_eq!(in_degree, 1);
5151
5152            // Node with no edges
5153            let lonely = session.create_node(&["Person"]);
5154            let (out, in_deg) = session.get_degree(lonely);
5155            assert_eq!(out, 0);
5156            assert_eq!(in_deg, 0);
5157        }
5158
5159        #[test]
5160        fn test_get_nodes_batch() {
5161            let db = GrafeoDB::new_in_memory();
5162            let session = db.session();
5163
5164            let alix = session.create_node(&["Person"]);
5165            let gus = session.create_node(&["Person"]);
5166            let harm = session.create_node(&["Person"]);
5167
5168            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
5169            assert_eq!(nodes.len(), 3);
5170            assert!(nodes[0].is_some());
5171            assert!(nodes[1].is_some());
5172            assert!(nodes[2].is_some());
5173
5174            // With non-existent node
5175            use grafeo_common::types::NodeId;
5176            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5177            assert_eq!(nodes_with_missing.len(), 3);
5178            assert!(nodes_with_missing[0].is_some());
5179            assert!(nodes_with_missing[1].is_none()); // Missing node
5180            assert!(nodes_with_missing[2].is_some());
5181        }
5182
5183        #[test]
5184        fn test_auto_commit_setting() {
5185            let db = GrafeoDB::new_in_memory();
5186            let mut session = db.session();
5187
5188            // Default is auto-commit enabled
5189            assert!(session.auto_commit());
5190
5191            session.set_auto_commit(false);
5192            assert!(!session.auto_commit());
5193
5194            session.set_auto_commit(true);
5195            assert!(session.auto_commit());
5196        }
5197
5198        #[test]
5199        fn test_transaction_double_begin_nests() {
5200            let db = GrafeoDB::new_in_memory();
5201            let mut session = db.session();
5202
5203            session.begin_transaction().unwrap();
5204            // Second begin_transaction creates a nested transaction (auto-savepoint)
5205            let result = session.begin_transaction();
5206            assert!(result.is_ok());
5207            // Commit the inner (releases savepoint)
5208            session.commit().unwrap();
5209            // Commit the outer
5210            session.commit().unwrap();
5211        }
5212
5213        #[test]
5214        fn test_commit_without_transaction_error() {
5215            let db = GrafeoDB::new_in_memory();
5216            let mut session = db.session();
5217
5218            let result = session.commit();
5219            assert!(result.is_err());
5220        }
5221
5222        #[test]
5223        fn test_rollback_without_transaction_error() {
5224            let db = GrafeoDB::new_in_memory();
5225            let mut session = db.session();
5226
5227            let result = session.rollback();
5228            assert!(result.is_err());
5229        }
5230
5231        #[test]
5232        fn test_create_edge_in_transaction() {
5233            let db = GrafeoDB::new_in_memory();
5234            let mut session = db.session();
5235
5236            // Create nodes outside transaction
5237            let alix = session.create_node(&["Person"]);
5238            let gus = session.create_node(&["Person"]);
5239
5240            // Create edge in transaction
5241            session.begin_transaction().unwrap();
5242            let edge_id = session.create_edge(alix, gus, "KNOWS");
5243
5244            // Edge should be visible in the transaction
5245            assert!(session.edge_exists(edge_id));
5246
5247            // Commit
5248            session.commit().unwrap();
5249
5250            // Edge should still be visible
5251            assert!(session.edge_exists(edge_id));
5252        }
5253
5254        #[test]
5255        fn test_neighbors_empty_node() {
5256            let db = GrafeoDB::new_in_memory();
5257            let session = db.session();
5258
5259            let lonely = session.create_node(&["Person"]);
5260
5261            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5262            assert!(session.get_neighbors_incoming(lonely).is_empty());
5263            assert!(
5264                session
5265                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5266                    .is_empty()
5267            );
5268        }
5269    }
5270
5271    #[test]
5272    fn test_auto_gc_triggers_on_commit_interval() {
5273        use crate::config::Config;
5274
5275        let config = Config::in_memory().with_gc_interval(2);
5276        let db = GrafeoDB::with_config(config).unwrap();
5277        let mut session = db.session();
5278
5279        // First commit: counter = 1, no GC (not a multiple of 2)
5280        session.begin_transaction().unwrap();
5281        session.create_node(&["A"]);
5282        session.commit().unwrap();
5283
5284        // Second commit: counter = 2, GC should trigger (multiple of 2)
5285        session.begin_transaction().unwrap();
5286        session.create_node(&["B"]);
5287        session.commit().unwrap();
5288
5289        // Verify the database is still functional after GC
5290        assert_eq!(db.node_count(), 2);
5291    }
5292
5293    #[test]
5294    fn test_query_timeout_config_propagates_to_session() {
5295        use crate::config::Config;
5296        use std::time::Duration;
5297
5298        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5299        let db = GrafeoDB::with_config(config).unwrap();
5300        let session = db.session();
5301
5302        // Verify the session has a query deadline (timeout was set)
5303        assert!(session.query_deadline().is_some());
5304    }
5305
5306    #[test]
5307    fn test_no_query_timeout_returns_no_deadline() {
5308        let db = GrafeoDB::new_in_memory();
5309        let session = db.session();
5310
5311        // Default config has no timeout
5312        assert!(session.query_deadline().is_none());
5313    }
5314
5315    #[test]
5316    fn test_graph_model_accessor() {
5317        use crate::config::GraphModel;
5318
5319        let db = GrafeoDB::new_in_memory();
5320        let session = db.session();
5321
5322        assert_eq!(session.graph_model(), GraphModel::Lpg);
5323    }
5324
5325    #[cfg(feature = "gql")]
5326    #[test]
5327    fn test_external_store_session() {
5328        use grafeo_core::graph::GraphStoreMut;
5329        use std::sync::Arc;
5330
5331        let config = crate::config::Config::in_memory();
5332        let store =
5333            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5334        let db = GrafeoDB::with_store(store, config).unwrap();
5335
5336        let mut session = db.session();
5337
5338        // Use an explicit transaction so that INSERT and MATCH share the same
5339        // transaction context. With PENDING epochs, uncommitted versions are
5340        // only visible to the owning transaction.
5341        session.begin_transaction().unwrap();
5342        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5343
5344        // Verify we can query through it within the same transaction
5345        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5346        assert_eq!(result.row_count(), 1);
5347
5348        session.commit().unwrap();
5349    }
5350
5351    // ==================== Session Command Tests ====================
5352
5353    #[cfg(feature = "gql")]
5354    mod session_command_tests {
5355        use super::*;
5356        use grafeo_common::types::Value;
5357
5358        #[test]
5359        fn test_use_graph_sets_current_graph() {
5360            let db = GrafeoDB::new_in_memory();
5361            let session = db.session();
5362
5363            // Create the graph first, then USE it
5364            session.execute("CREATE GRAPH mydb").unwrap();
5365            session.execute("USE GRAPH mydb").unwrap();
5366
5367            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5368        }
5369
5370        #[test]
5371        fn test_use_graph_nonexistent_errors() {
5372            let db = GrafeoDB::new_in_memory();
5373            let session = db.session();
5374
5375            let result = session.execute("USE GRAPH doesnotexist");
5376            assert!(result.is_err());
5377            let err = result.unwrap_err().to_string();
5378            assert!(
5379                err.contains("does not exist"),
5380                "Expected 'does not exist' error, got: {err}"
5381            );
5382        }
5383
5384        #[test]
5385        fn test_use_graph_default_always_valid() {
5386            let db = GrafeoDB::new_in_memory();
5387            let session = db.session();
5388
5389            // "default" is always valid, even without CREATE GRAPH
5390            session.execute("USE GRAPH default").unwrap();
5391            assert_eq!(session.current_graph(), Some("default".to_string()));
5392        }
5393
5394        #[test]
5395        fn test_session_set_graph() {
5396            let db = GrafeoDB::new_in_memory();
5397            let session = db.session();
5398
5399            session.execute("CREATE GRAPH analytics").unwrap();
5400            session.execute("SESSION SET GRAPH analytics").unwrap();
5401            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5402        }
5403
5404        #[test]
5405        fn test_session_set_graph_nonexistent_errors() {
5406            let db = GrafeoDB::new_in_memory();
5407            let session = db.session();
5408
5409            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5410            assert!(result.is_err());
5411        }
5412
5413        #[test]
5414        fn test_session_set_time_zone() {
5415            let db = GrafeoDB::new_in_memory();
5416            let session = db.session();
5417
5418            assert_eq!(session.time_zone(), None);
5419
5420            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5421            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5422
5423            session
5424                .execute("SESSION SET TIME ZONE 'America/New_York'")
5425                .unwrap();
5426            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5427        }
5428
5429        #[test]
5430        fn test_session_set_parameter() {
5431            let db = GrafeoDB::new_in_memory();
5432            let session = db.session();
5433
5434            session
5435                .execute("SESSION SET PARAMETER $timeout = 30")
5436                .unwrap();
5437
5438            // Parameter is stored (value is Null for now, since expression
5439            // evaluation is not yet wired up)
5440            assert!(session.get_parameter("timeout").is_some());
5441        }
5442
5443        #[test]
5444        fn test_session_reset_clears_all_state() {
5445            let db = GrafeoDB::new_in_memory();
5446            let session = db.session();
5447
5448            // Set various session state
5449            session.execute("CREATE GRAPH analytics").unwrap();
5450            session.execute("SESSION SET GRAPH analytics").unwrap();
5451            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5452            session
5453                .execute("SESSION SET PARAMETER $limit = 100")
5454                .unwrap();
5455
5456            // Verify state was set
5457            assert!(session.current_graph().is_some());
5458            assert!(session.time_zone().is_some());
5459            assert!(session.get_parameter("limit").is_some());
5460
5461            // Reset everything
5462            session.execute("SESSION RESET").unwrap();
5463
5464            assert_eq!(session.current_graph(), None);
5465            assert_eq!(session.time_zone(), None);
5466            assert!(session.get_parameter("limit").is_none());
5467        }
5468
5469        #[test]
5470        fn test_session_close_clears_state() {
5471            let db = GrafeoDB::new_in_memory();
5472            let session = db.session();
5473
5474            session.execute("CREATE GRAPH analytics").unwrap();
5475            session.execute("SESSION SET GRAPH analytics").unwrap();
5476            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5477
5478            session.execute("SESSION CLOSE").unwrap();
5479
5480            assert_eq!(session.current_graph(), None);
5481            assert_eq!(session.time_zone(), None);
5482        }
5483
5484        #[test]
5485        fn test_create_graph() {
5486            let db = GrafeoDB::new_in_memory();
5487            let session = db.session();
5488
5489            session.execute("CREATE GRAPH mydb").unwrap();
5490
5491            // Should be able to USE it now
5492            session.execute("USE GRAPH mydb").unwrap();
5493            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5494        }
5495
5496        #[test]
5497        fn test_create_graph_duplicate_errors() {
5498            let db = GrafeoDB::new_in_memory();
5499            let session = db.session();
5500
5501            session.execute("CREATE GRAPH mydb").unwrap();
5502            let result = session.execute("CREATE GRAPH mydb");
5503
5504            assert!(result.is_err());
5505            let err = result.unwrap_err().to_string();
5506            assert!(
5507                err.contains("already exists"),
5508                "Expected 'already exists' error, got: {err}"
5509            );
5510        }
5511
5512        #[test]
5513        fn test_create_graph_if_not_exists() {
5514            let db = GrafeoDB::new_in_memory();
5515            let session = db.session();
5516
5517            session.execute("CREATE GRAPH mydb").unwrap();
5518            // Should succeed silently with IF NOT EXISTS
5519            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5520        }
5521
5522        #[test]
5523        fn test_drop_graph() {
5524            let db = GrafeoDB::new_in_memory();
5525            let session = db.session();
5526
5527            session.execute("CREATE GRAPH mydb").unwrap();
5528            session.execute("DROP GRAPH mydb").unwrap();
5529
5530            // Should no longer be usable
5531            let result = session.execute("USE GRAPH mydb");
5532            assert!(result.is_err());
5533        }
5534
5535        #[test]
5536        fn test_drop_graph_nonexistent_errors() {
5537            let db = GrafeoDB::new_in_memory();
5538            let session = db.session();
5539
5540            let result = session.execute("DROP GRAPH nosuchgraph");
5541            assert!(result.is_err());
5542            let err = result.unwrap_err().to_string();
5543            assert!(
5544                err.contains("does not exist"),
5545                "Expected 'does not exist' error, got: {err}"
5546            );
5547        }
5548
5549        #[test]
5550        fn test_drop_graph_if_exists() {
5551            let db = GrafeoDB::new_in_memory();
5552            let session = db.session();
5553
5554            // Should succeed silently with IF EXISTS
5555            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5556        }
5557
5558        #[test]
5559        fn test_start_transaction_via_gql() {
5560            let db = GrafeoDB::new_in_memory();
5561            let session = db.session();
5562
5563            session.execute("START TRANSACTION").unwrap();
5564            assert!(session.in_transaction());
5565            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5566            session.execute("COMMIT").unwrap();
5567            assert!(!session.in_transaction());
5568
5569            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5570            assert_eq!(result.rows.len(), 1);
5571        }
5572
5573        #[test]
5574        fn test_start_transaction_read_only_blocks_insert() {
5575            let db = GrafeoDB::new_in_memory();
5576            let session = db.session();
5577
5578            session.execute("START TRANSACTION READ ONLY").unwrap();
5579            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5580            assert!(result.is_err());
5581            let err = result.unwrap_err().to_string();
5582            assert!(
5583                err.contains("read-only"),
5584                "Expected read-only error, got: {err}"
5585            );
5586            session.execute("ROLLBACK").unwrap();
5587        }
5588
5589        #[test]
5590        fn test_start_transaction_read_only_allows_reads() {
5591            let db = GrafeoDB::new_in_memory();
5592            let mut session = db.session();
5593            session.begin_transaction().unwrap();
5594            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5595            session.commit().unwrap();
5596
5597            session.execute("START TRANSACTION READ ONLY").unwrap();
5598            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5599            assert_eq!(result.rows.len(), 1);
5600            session.execute("COMMIT").unwrap();
5601        }
5602
5603        #[test]
5604        fn test_rollback_via_gql() {
5605            let db = GrafeoDB::new_in_memory();
5606            let session = db.session();
5607
5608            session.execute("START TRANSACTION").unwrap();
5609            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5610            session.execute("ROLLBACK").unwrap();
5611
5612            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5613            assert!(result.rows.is_empty());
5614        }
5615
5616        #[test]
5617        fn test_start_transaction_with_isolation_level() {
5618            let db = GrafeoDB::new_in_memory();
5619            let session = db.session();
5620
5621            session
5622                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5623                .unwrap();
5624            assert!(session.in_transaction());
5625            session.execute("ROLLBACK").unwrap();
5626        }
5627
5628        #[test]
5629        fn test_session_commands_return_empty_result() {
5630            let db = GrafeoDB::new_in_memory();
5631            let session = db.session();
5632
5633            session.execute("CREATE GRAPH test").unwrap();
5634            let result = session.execute("SESSION SET GRAPH test").unwrap();
5635            assert_eq!(result.row_count(), 0);
5636            assert_eq!(result.column_count(), 0);
5637        }
5638
5639        #[test]
5640        fn test_current_graph_default_is_none() {
5641            let db = GrafeoDB::new_in_memory();
5642            let session = db.session();
5643
5644            assert_eq!(session.current_graph(), None);
5645        }
5646
5647        #[test]
5648        fn test_time_zone_default_is_none() {
5649            let db = GrafeoDB::new_in_memory();
5650            let session = db.session();
5651
5652            assert_eq!(session.time_zone(), None);
5653        }
5654
5655        #[test]
5656        fn test_session_state_independent_across_sessions() {
5657            let db = GrafeoDB::new_in_memory();
5658            let session1 = db.session();
5659            let session2 = db.session();
5660
5661            session1.execute("CREATE GRAPH first").unwrap();
5662            session1.execute("CREATE GRAPH second").unwrap();
5663            session1.execute("SESSION SET GRAPH first").unwrap();
5664            session2.execute("SESSION SET GRAPH second").unwrap();
5665
5666            assert_eq!(session1.current_graph(), Some("first".to_string()));
5667            assert_eq!(session2.current_graph(), Some("second".to_string()));
5668        }
5669
5670        #[test]
5671        fn test_show_node_types() {
5672            let db = GrafeoDB::new_in_memory();
5673            let session = db.session();
5674
5675            session
5676                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5677                .unwrap();
5678
5679            let result = session.execute("SHOW NODE TYPES").unwrap();
5680            assert_eq!(
5681                result.columns,
5682                vec!["name", "properties", "constraints", "parents"]
5683            );
5684            assert_eq!(result.rows.len(), 1);
5685            // First column is the type name
5686            assert_eq!(result.rows[0][0], Value::from("Person"));
5687        }
5688
5689        #[test]
5690        fn test_show_edge_types() {
5691            let db = GrafeoDB::new_in_memory();
5692            let session = db.session();
5693
5694            session
5695                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5696                .unwrap();
5697
5698            let result = session.execute("SHOW EDGE TYPES").unwrap();
5699            assert_eq!(
5700                result.columns,
5701                vec!["name", "properties", "source_types", "target_types"]
5702            );
5703            assert_eq!(result.rows.len(), 1);
5704            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5705        }
5706
5707        #[test]
5708        fn test_show_graph_types() {
5709            let db = GrafeoDB::new_in_memory();
5710            let session = db.session();
5711
5712            session
5713                .execute("CREATE NODE TYPE Person (name STRING)")
5714                .unwrap();
5715            session
5716                .execute(
5717                    "CREATE GRAPH TYPE social (\
5718                        NODE TYPE Person (name STRING)\
5719                    )",
5720                )
5721                .unwrap();
5722
5723            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5724            assert_eq!(
5725                result.columns,
5726                vec!["name", "open", "node_types", "edge_types"]
5727            );
5728            assert_eq!(result.rows.len(), 1);
5729            assert_eq!(result.rows[0][0], Value::from("social"));
5730        }
5731
5732        #[test]
5733        fn test_show_graph_type_named() {
5734            let db = GrafeoDB::new_in_memory();
5735            let session = db.session();
5736
5737            session
5738                .execute("CREATE NODE TYPE Person (name STRING)")
5739                .unwrap();
5740            session
5741                .execute(
5742                    "CREATE GRAPH TYPE social (\
5743                        NODE TYPE Person (name STRING)\
5744                    )",
5745                )
5746                .unwrap();
5747
5748            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5749            assert_eq!(result.rows.len(), 1);
5750            assert_eq!(result.rows[0][0], Value::from("social"));
5751        }
5752
5753        #[test]
5754        fn test_show_graph_type_not_found() {
5755            let db = GrafeoDB::new_in_memory();
5756            let session = db.session();
5757
5758            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5759            assert!(result.is_err());
5760        }
5761
5762        #[test]
5763        fn test_show_indexes_via_gql() {
5764            let db = GrafeoDB::new_in_memory();
5765            let session = db.session();
5766
5767            let result = session.execute("SHOW INDEXES").unwrap();
5768            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5769        }
5770
5771        #[test]
5772        fn test_show_constraints_via_gql() {
5773            let db = GrafeoDB::new_in_memory();
5774            let session = db.session();
5775
5776            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5777            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5778        }
5779
5780        #[test]
5781        fn test_pattern_form_graph_type_roundtrip() {
5782            let db = GrafeoDB::new_in_memory();
5783            let session = db.session();
5784
5785            // Register the types first
5786            session
5787                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5788                .unwrap();
5789            session
5790                .execute("CREATE NODE TYPE City (name STRING)")
5791                .unwrap();
5792            session
5793                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5794                .unwrap();
5795            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5796
5797            // Create graph type using pattern form
5798            session
5799                .execute(
5800                    "CREATE GRAPH TYPE social (\
5801                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5802                        (:Person)-[:LIVES_IN]->(:City)\
5803                    )",
5804                )
5805                .unwrap();
5806
5807            // Verify it was created
5808            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5809            assert_eq!(result.rows.len(), 1);
5810            assert_eq!(result.rows[0][0], Value::from("social"));
5811        }
5812    }
5813}