Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28/// Parses a DDL default-value literal string into a [`Value`].
29///
30/// Handles string literals (single- or double-quoted), integers, floats,
31/// booleans (`true`/`false`), and `NULL`.
32fn parse_default_literal(text: &str) -> Value {
33    if text.eq_ignore_ascii_case("null") {
34        return Value::Null;
35    }
36    if text.eq_ignore_ascii_case("true") {
37        return Value::Bool(true);
38    }
39    if text.eq_ignore_ascii_case("false") {
40        return Value::Bool(false);
41    }
42    // String literal: strip surrounding quotes
43    if (text.starts_with('\'') && text.ends_with('\''))
44        || (text.starts_with('"') && text.ends_with('"'))
45    {
46        return Value::String(text[1..text.len() - 1].into());
47    }
48    // Try integer, then float
49    if let Ok(i) = text.parse::<i64>() {
50        return Value::Int64(i);
51    }
52    if let Ok(f) = text.parse::<f64>() {
53        return Value::Float64(f);
54    }
55    // Fallback: treat as string
56    Value::String(text.into())
57}
58
59/// Runtime configuration for creating a new session.
60///
61/// Groups the shared parameters passed to all session constructors, keeping
62/// call sites readable and avoiding long argument lists.
63pub(crate) struct SessionConfig {
64    pub transaction_manager: Arc<TransactionManager>,
65    pub query_cache: Arc<QueryCache>,
66    pub catalog: Arc<Catalog>,
67    pub adaptive_config: AdaptiveConfig,
68    pub factorized_execution: bool,
69    pub graph_model: GraphModel,
70    pub query_timeout: Option<Duration>,
71    pub commit_counter: Arc<AtomicUsize>,
72    pub gc_interval: usize,
73    /// When true, the session permanently blocks all mutations.
74    pub read_only: bool,
75}
76
77/// Your handle to the database - execute queries and manage transactions.
78///
79/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
80/// tracks its own transaction state, so you can have multiple concurrent
81/// sessions without them interfering.
82pub struct Session {
83    /// The underlying store.
84    store: Arc<LpgStore>,
85    /// Graph store trait object for pluggable storage backends.
86    graph_store: Arc<dyn GraphStoreMut>,
87    /// Schema and metadata catalog shared across sessions.
88    catalog: Arc<Catalog>,
89    /// RDF triple store (if RDF feature is enabled).
90    #[cfg(feature = "rdf")]
91    rdf_store: Arc<RdfStore>,
92    /// Transaction manager.
93    transaction_manager: Arc<TransactionManager>,
94    /// Query cache shared across sessions.
95    query_cache: Arc<QueryCache>,
96    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
97    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
98    /// from within `execute(&self)`.
99    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
100    /// Whether the current transaction is read-only (blocks mutations).
101    read_only_tx: parking_lot::Mutex<bool>,
102    /// Whether the database itself is read-only (set at open time, never changes).
103    /// When true, `read_only_tx` is always true regardless of transaction flags.
104    db_read_only: bool,
105    /// Whether the session is in auto-commit mode.
106    auto_commit: bool,
107    /// Adaptive execution configuration.
108    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
109    adaptive_config: AdaptiveConfig,
110    /// Whether to use factorized execution for multi-hop queries.
111    factorized_execution: bool,
112    /// The graph data model this session operates on.
113    graph_model: GraphModel,
114    /// Maximum time a query may run before being cancelled.
115    query_timeout: Option<Duration>,
116    /// Shared commit counter for triggering auto-GC.
117    commit_counter: Arc<AtomicUsize>,
118    /// GC every N commits (0 = disabled).
119    gc_interval: usize,
120    /// Node count at the start of the current transaction (for PreparedCommit stats).
121    transaction_start_node_count: AtomicUsize,
122    /// Edge count at the start of the current transaction (for PreparedCommit stats).
123    transaction_start_edge_count: AtomicUsize,
124    /// WAL for logging schema changes.
125    #[cfg(feature = "wal")]
126    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
127    /// Shared WAL graph context tracker for named graph awareness.
128    #[cfg(feature = "wal")]
129    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
130    /// CDC log for change tracking.
131    #[cfg(feature = "cdc")]
132    cdc_log: Arc<crate::cdc::CdcLog>,
133    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
134    current_graph: parking_lot::Mutex<Option<String>>,
135    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
136    /// None = "not set" (uses default schema).
137    current_schema: parking_lot::Mutex<Option<String>>,
138    /// Session time zone override.
139    time_zone: parking_lot::Mutex<Option<String>>,
140    /// Session-level parameters (SET PARAMETER).
141    session_params:
142        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
143    /// Override epoch for time-travel queries (None = use transaction/current epoch).
144    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
145    /// Savepoints within the current transaction.
146    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
147    /// Nesting depth for nested transactions (0 = outermost).
148    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
149    /// releases it, nested `ROLLBACK` rolls back to it.
150    transaction_nesting_depth: parking_lot::Mutex<u32>,
151    /// Named graphs touched during the current transaction (for cross-graph atomicity).
152    /// `None` represents the default graph. Populated at `BEGIN` time and on each
153    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
154    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
155    /// Shared metrics registry (populated when the `metrics` feature is enabled).
156    #[cfg(feature = "metrics")]
157    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
158    /// Transaction start time for duration tracking.
159    #[cfg(feature = "metrics")]
160    tx_start_time: parking_lot::Mutex<Option<Instant>>,
161}
162
163/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
164#[derive(Clone)]
165struct GraphSavepoint {
166    graph_name: Option<String>,
167    next_node_id: u64,
168    next_edge_id: u64,
169    undo_log_position: usize,
170}
171
172/// Savepoint state: name + per-graph snapshots + the graph that was active.
173#[derive(Clone)]
174struct SavepointState {
175    name: String,
176    graph_snapshots: Vec<GraphSavepoint>,
177    /// The graph that was active when the savepoint was created.
178    /// Reserved for future use (e.g., restoring graph context on rollback).
179    #[allow(dead_code)]
180    active_graph: Option<String>,
181}
182
183impl Session {
184    /// Creates a new session with adaptive execution configuration.
185    #[allow(dead_code)]
186    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
187        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
188        Self {
189            store,
190            graph_store,
191            catalog: cfg.catalog,
192            #[cfg(feature = "rdf")]
193            rdf_store: Arc::new(RdfStore::new()),
194            transaction_manager: cfg.transaction_manager,
195            query_cache: cfg.query_cache,
196            current_transaction: parking_lot::Mutex::new(None),
197            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
198            db_read_only: cfg.read_only,
199            auto_commit: true,
200            adaptive_config: cfg.adaptive_config,
201            factorized_execution: cfg.factorized_execution,
202            graph_model: cfg.graph_model,
203            query_timeout: cfg.query_timeout,
204            commit_counter: cfg.commit_counter,
205            gc_interval: cfg.gc_interval,
206            transaction_start_node_count: AtomicUsize::new(0),
207            transaction_start_edge_count: AtomicUsize::new(0),
208            #[cfg(feature = "wal")]
209            wal: None,
210            #[cfg(feature = "wal")]
211            wal_graph_context: None,
212            #[cfg(feature = "cdc")]
213            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
214            current_graph: parking_lot::Mutex::new(None),
215            current_schema: parking_lot::Mutex::new(None),
216            time_zone: parking_lot::Mutex::new(None),
217            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
218            viewing_epoch_override: parking_lot::Mutex::new(None),
219            savepoints: parking_lot::Mutex::new(Vec::new()),
220            transaction_nesting_depth: parking_lot::Mutex::new(0),
221            touched_graphs: parking_lot::Mutex::new(Vec::new()),
222            #[cfg(feature = "metrics")]
223            metrics: None,
224            #[cfg(feature = "metrics")]
225            tx_start_time: parking_lot::Mutex::new(None),
226        }
227    }
228
229    /// Sets the WAL for this session (shared with the database).
230    ///
231    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
232    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
233    #[cfg(feature = "wal")]
234    pub(crate) fn set_wal(
235        &mut self,
236        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
237        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
238    ) {
239        // Wrap the graph store so query-engine mutations are WAL-logged
240        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
241            Arc::clone(&self.store),
242            Arc::clone(&wal),
243            Arc::clone(&wal_graph_context),
244        ));
245        self.wal = Some(wal);
246        self.wal_graph_context = Some(wal_graph_context);
247    }
248
249    /// Sets the CDC log for this session (shared with the database).
250    #[cfg(feature = "cdc")]
251    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
252        self.cdc_log = cdc_log;
253    }
254
255    /// Sets the metrics registry for this session (shared with the database).
256    #[cfg(feature = "metrics")]
257    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
258        self.metrics = Some(metrics);
259    }
260
261    /// Creates a session backed by an external graph store.
262    ///
263    /// The external store handles all data operations. Transaction management
264    /// (begin/commit/rollback) is not supported for external stores.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the internal arena allocation fails (out of memory).
269    pub(crate) fn with_external_store(
270        store: Arc<dyn GraphStoreMut>,
271        cfg: SessionConfig,
272    ) -> Result<Self> {
273        Ok(Self {
274            store: Arc::new(LpgStore::new()?),
275            graph_store: store,
276            catalog: cfg.catalog,
277            #[cfg(feature = "rdf")]
278            rdf_store: Arc::new(RdfStore::new()),
279            transaction_manager: cfg.transaction_manager,
280            query_cache: cfg.query_cache,
281            current_transaction: parking_lot::Mutex::new(None),
282            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
283            db_read_only: cfg.read_only,
284            auto_commit: true,
285            adaptive_config: cfg.adaptive_config,
286            factorized_execution: cfg.factorized_execution,
287            graph_model: cfg.graph_model,
288            query_timeout: cfg.query_timeout,
289            commit_counter: cfg.commit_counter,
290            gc_interval: cfg.gc_interval,
291            transaction_start_node_count: AtomicUsize::new(0),
292            transaction_start_edge_count: AtomicUsize::new(0),
293            #[cfg(feature = "wal")]
294            wal: None,
295            #[cfg(feature = "wal")]
296            wal_graph_context: None,
297            #[cfg(feature = "cdc")]
298            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
299            current_graph: parking_lot::Mutex::new(None),
300            current_schema: parking_lot::Mutex::new(None),
301            time_zone: parking_lot::Mutex::new(None),
302            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
303            viewing_epoch_override: parking_lot::Mutex::new(None),
304            savepoints: parking_lot::Mutex::new(Vec::new()),
305            transaction_nesting_depth: parking_lot::Mutex::new(0),
306            touched_graphs: parking_lot::Mutex::new(Vec::new()),
307            #[cfg(feature = "metrics")]
308            metrics: None,
309            #[cfg(feature = "metrics")]
310            tx_start_time: parking_lot::Mutex::new(None),
311        })
312    }
313
314    /// Returns the graph model this session operates on.
315    #[must_use]
316    pub fn graph_model(&self) -> GraphModel {
317        self.graph_model
318    }
319
320    // === Session State Management ===
321
322    /// Sets the current graph for this session (USE GRAPH).
323    pub fn use_graph(&self, name: &str) {
324        *self.current_graph.lock() = Some(name.to_string());
325    }
326
327    /// Returns the current graph name, if any.
328    #[must_use]
329    pub fn current_graph(&self) -> Option<String> {
330        self.current_graph.lock().clone()
331    }
332
333    /// Sets the current schema for this session (SESSION SET SCHEMA).
334    ///
335    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
336    pub fn set_schema(&self, name: &str) {
337        *self.current_schema.lock() = Some(name.to_string());
338    }
339
340    /// Returns the current schema name, if any.
341    ///
342    /// `None` means "not set", which resolves to the default schema.
343    #[must_use]
344    pub fn current_schema(&self) -> Option<String> {
345        self.current_schema.lock().clone()
346    }
347
348    /// Computes the effective storage key for a graph, accounting for schema context.
349    ///
350    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
351    /// Uses `/` as separator since it is invalid in GQL identifiers.
352    fn effective_graph_key(&self, graph_name: &str) -> String {
353        let schema = self.current_schema.lock().clone();
354        match schema {
355            Some(s) => format!("{s}/{graph_name}"),
356            None => graph_name.to_string(),
357        }
358    }
359
360    /// Returns the effective storage key for the current graph, accounting for schema.
361    ///
362    /// Combines `current_schema` and `current_graph` into a flat lookup key.
363    fn active_graph_storage_key(&self) -> Option<String> {
364        let graph = self.current_graph.lock().clone();
365        let schema = self.current_schema.lock().clone();
366        match (schema, graph) {
367            (_, None) => None,
368            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
369            (None, Some(name)) => Some(name),
370            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
371        }
372    }
373
374    /// Returns the graph store for the currently active graph.
375    ///
376    /// If `current_graph` is `None` or `"default"`, returns the session's
377    /// default `graph_store` (already WAL-wrapped for the default graph).
378    /// Otherwise looks up the named graph in the root store and wraps it
379    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
380    /// graph context.
381    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
382        let key = self.active_graph_storage_key();
383        match key {
384            None => Arc::clone(&self.graph_store),
385            Some(ref name) => match self.store.graph(name) {
386                Some(named_store) => {
387                    #[cfg(feature = "wal")]
388                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
389                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
390                            named_store,
391                            Arc::clone(wal),
392                            name.clone(),
393                            Arc::clone(ctx),
394                        )) as Arc<dyn GraphStoreMut>;
395                    }
396                    named_store as Arc<dyn GraphStoreMut>
397                }
398                None => Arc::clone(&self.graph_store),
399            },
400        }
401    }
402
403    /// Returns the concrete `LpgStore` for the currently active graph.
404    ///
405    /// Used by direct CRUD methods that need the concrete store type
406    /// for versioned operations.
407    fn active_lpg_store(&self) -> Arc<LpgStore> {
408        let key = self.active_graph_storage_key();
409        match key {
410            None => Arc::clone(&self.store),
411            Some(ref name) => self
412                .store
413                .graph(name)
414                .unwrap_or_else(|| Arc::clone(&self.store)),
415        }
416    }
417
418    /// Resolves a graph name to a concrete `LpgStore`.
419    /// `None` and `"default"` resolve to the session's root store.
420    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
421        match graph_name {
422            None => Arc::clone(&self.store),
423            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
424            Some(name) => self
425                .store
426                .graph(name)
427                .unwrap_or_else(|| Arc::clone(&self.store)),
428        }
429    }
430
431    /// Records the current graph as "touched" if a transaction is active.
432    ///
433    /// Uses the full storage key (schema/graph) so that commit/rollback
434    /// can resolve the correct store via `resolve_store`.
435    fn track_graph_touch(&self) {
436        if self.current_transaction.lock().is_some() {
437            let key = self.active_graph_storage_key();
438            let mut touched = self.touched_graphs.lock();
439            if !touched.contains(&key) {
440                touched.push(key);
441            }
442        }
443    }
444
445    /// Sets the session time zone.
446    pub fn set_time_zone(&self, tz: &str) {
447        *self.time_zone.lock() = Some(tz.to_string());
448    }
449
450    /// Returns the session time zone, if set.
451    #[must_use]
452    pub fn time_zone(&self) -> Option<String> {
453        self.time_zone.lock().clone()
454    }
455
456    /// Sets a session parameter.
457    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
458        self.session_params.lock().insert(key.to_string(), value);
459    }
460
461    /// Gets a session parameter by cloning it.
462    #[must_use]
463    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
464        self.session_params.lock().get(key).cloned()
465    }
466
467    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
468    pub fn reset_session(&self) {
469        *self.current_schema.lock() = None;
470        *self.current_graph.lock() = None;
471        *self.time_zone.lock() = None;
472        self.session_params.lock().clear();
473        *self.viewing_epoch_override.lock() = None;
474    }
475
476    /// Resets only the session schema (Section 7.2 GR1).
477    pub fn reset_schema(&self) {
478        *self.current_schema.lock() = None;
479    }
480
481    /// Resets only the session graph (Section 7.2 GR2).
482    pub fn reset_graph(&self) {
483        *self.current_graph.lock() = None;
484    }
485
486    /// Resets only the session time zone (Section 7.2 GR3).
487    pub fn reset_time_zone(&self) {
488        *self.time_zone.lock() = None;
489    }
490
491    /// Resets only session parameters (Section 7.2 GR4).
492    pub fn reset_parameters(&self) {
493        self.session_params.lock().clear();
494    }
495
496    // --- Time-travel API ---
497
498    /// Sets a viewing epoch override for time-travel queries.
499    ///
500    /// While set, all queries on this session see the database as it existed
501    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
502    /// to return to normal behavior.
503    pub fn set_viewing_epoch(&self, epoch: EpochId) {
504        *self.viewing_epoch_override.lock() = Some(epoch);
505    }
506
507    /// Clears the viewing epoch override, returning to normal behavior.
508    pub fn clear_viewing_epoch(&self) {
509        *self.viewing_epoch_override.lock() = None;
510    }
511
512    /// Returns the current viewing epoch override, if any.
513    #[must_use]
514    pub fn viewing_epoch(&self) -> Option<EpochId> {
515        *self.viewing_epoch_override.lock()
516    }
517
518    /// Returns all versions of a node with their creation/deletion epochs.
519    ///
520    /// Properties and labels reflect the current state (not versioned per-epoch).
521    #[must_use]
522    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
523        self.active_lpg_store().get_node_history(id)
524    }
525
526    /// Returns all versions of an edge with their creation/deletion epochs.
527    ///
528    /// Properties reflect the current state (not versioned per-epoch).
529    #[must_use]
530    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
531        self.active_lpg_store().get_edge_history(id)
532    }
533
534    /// Checks that the session's graph model supports LPG operations.
535    fn require_lpg(&self, language: &str) -> Result<()> {
536        if self.graph_model == GraphModel::Rdf {
537            return Err(grafeo_common::utils::error::Error::Internal(format!(
538                "This is an RDF database. {language} queries require an LPG database."
539            )));
540        }
541        Ok(())
542    }
543
544    /// Executes a session or transaction command, returning an empty result.
545    #[cfg(feature = "gql")]
546    fn execute_session_command(
547        &self,
548        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
549    ) -> Result<QueryResult> {
550        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
551        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
552
553        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
554        if *self.read_only_tx.lock() {
555            match &cmd {
556                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
557                    return Err(Error::Transaction(
558                        grafeo_common::utils::error::TransactionError::ReadOnly,
559                    ));
560                }
561                _ => {} // Session state + transaction control allowed
562            }
563        }
564
565        match cmd {
566            SessionCommand::CreateGraph {
567                name,
568                if_not_exists,
569                typed,
570                like_graph,
571                copy_of,
572                open: _,
573            } => {
574                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
575                let storage_key = self.effective_graph_key(&name);
576
577                // Validate source graph exists for LIKE / AS COPY OF
578                if let Some(ref src) = like_graph {
579                    let src_key = self.effective_graph_key(src);
580                    if self.store.graph(&src_key).is_none() {
581                        return Err(Error::Query(QueryError::new(
582                            QueryErrorKind::Semantic,
583                            format!("Source graph '{src}' does not exist"),
584                        )));
585                    }
586                }
587                if let Some(ref src) = copy_of {
588                    let src_key = self.effective_graph_key(src);
589                    if self.store.graph(&src_key).is_none() {
590                        return Err(Error::Query(QueryError::new(
591                            QueryErrorKind::Semantic,
592                            format!("Source graph '{src}' does not exist"),
593                        )));
594                    }
595                }
596
597                let created = self
598                    .store
599                    .create_graph(&storage_key)
600                    .map_err(|e| Error::Internal(e.to_string()))?;
601                if !created && !if_not_exists {
602                    return Err(Error::Query(QueryError::new(
603                        QueryErrorKind::Semantic,
604                        format!("Graph '{name}' already exists"),
605                    )));
606                }
607                if created {
608                    #[cfg(feature = "wal")]
609                    self.log_schema_wal(
610                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
611                            name: storage_key.clone(),
612                        },
613                    );
614                }
615
616                // AS COPY OF: copy data from source graph
617                if let Some(ref src) = copy_of {
618                    let src_key = self.effective_graph_key(src);
619                    self.store
620                        .copy_graph(Some(&src_key), Some(&storage_key))
621                        .map_err(|e| Error::Internal(e.to_string()))?;
622                }
623
624                // Bind to graph type if specified
625                if let Some(type_name) = typed
626                    && let Err(e) = self
627                        .catalog
628                        .bind_graph_type(&storage_key, type_name.clone())
629                {
630                    return Err(Error::Query(QueryError::new(
631                        QueryErrorKind::Semantic,
632                        e.to_string(),
633                    )));
634                }
635
636                // LIKE: copy graph type binding from source
637                if let Some(ref src) = like_graph {
638                    let src_key = self.effective_graph_key(src);
639                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
640                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
641                    }
642                }
643
644                Ok(QueryResult::empty())
645            }
646            SessionCommand::DropGraph { name, if_exists } => {
647                let storage_key = self.effective_graph_key(&name);
648                let dropped = self.store.drop_graph(&storage_key);
649                if !dropped && !if_exists {
650                    return Err(Error::Query(QueryError::new(
651                        QueryErrorKind::Semantic,
652                        format!("Graph '{name}' does not exist"),
653                    )));
654                }
655                if dropped {
656                    #[cfg(feature = "wal")]
657                    self.log_schema_wal(
658                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
659                            name: storage_key.clone(),
660                        },
661                    );
662                    // If this session was using the dropped graph, reset to default
663                    let mut current = self.current_graph.lock();
664                    if current
665                        .as_deref()
666                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
667                    {
668                        *current = None;
669                    }
670                }
671                Ok(QueryResult::empty())
672            }
673            SessionCommand::UseGraph(name) => {
674                // Verify graph exists (resolve within current schema)
675                let effective_key = self.effective_graph_key(&name);
676                if !name.eq_ignore_ascii_case("default")
677                    && self.store.graph(&effective_key).is_none()
678                {
679                    return Err(Error::Query(QueryError::new(
680                        QueryErrorKind::Semantic,
681                        format!("Graph '{name}' does not exist"),
682                    )));
683                }
684                self.use_graph(&name);
685                // Track the new graph if in a transaction
686                self.track_graph_touch();
687                Ok(QueryResult::empty())
688            }
689            SessionCommand::SessionSetGraph(name) => {
690                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
691                let effective_key = self.effective_graph_key(&name);
692                if !name.eq_ignore_ascii_case("default")
693                    && self.store.graph(&effective_key).is_none()
694                {
695                    return Err(Error::Query(QueryError::new(
696                        QueryErrorKind::Semantic,
697                        format!("Graph '{name}' does not exist"),
698                    )));
699                }
700                self.use_graph(&name);
701                // Track the new graph if in a transaction
702                self.track_graph_touch();
703                Ok(QueryResult::empty())
704            }
705            SessionCommand::SessionSetSchema(name) => {
706                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
707                if !self.catalog.schema_exists(&name) {
708                    return Err(Error::Query(QueryError::new(
709                        QueryErrorKind::Semantic,
710                        format!("Schema '{name}' does not exist"),
711                    )));
712                }
713                self.set_schema(&name);
714                Ok(QueryResult::empty())
715            }
716            SessionCommand::SessionSetTimeZone(tz) => {
717                self.set_time_zone(&tz);
718                Ok(QueryResult::empty())
719            }
720            SessionCommand::SessionSetParameter(key, expr) => {
721                if key.eq_ignore_ascii_case("viewing_epoch") {
722                    match Self::eval_integer_literal(&expr) {
723                        Some(n) if n >= 0 => {
724                            self.set_viewing_epoch(EpochId::new(n as u64));
725                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
726                        }
727                        _ => Err(Error::Query(QueryError::new(
728                            QueryErrorKind::Semantic,
729                            "viewing_epoch must be a non-negative integer literal",
730                        ))),
731                    }
732                } else {
733                    // For now, store parameter name with Null value.
734                    // Full expression evaluation would require building and executing a plan.
735                    self.set_parameter(&key, Value::Null);
736                    Ok(QueryResult::empty())
737                }
738            }
739            SessionCommand::SessionReset(target) => {
740                use grafeo_adapters::query::gql::ast::SessionResetTarget;
741                match target {
742                    SessionResetTarget::All => self.reset_session(),
743                    SessionResetTarget::Schema => self.reset_schema(),
744                    SessionResetTarget::Graph => self.reset_graph(),
745                    SessionResetTarget::TimeZone => self.reset_time_zone(),
746                    SessionResetTarget::Parameters => self.reset_parameters(),
747                }
748                Ok(QueryResult::empty())
749            }
750            SessionCommand::SessionClose => {
751                self.reset_session();
752                Ok(QueryResult::empty())
753            }
754            SessionCommand::StartTransaction {
755                read_only,
756                isolation_level,
757            } => {
758                let engine_level = isolation_level.map(|l| match l {
759                    TransactionIsolationLevel::ReadCommitted => {
760                        crate::transaction::IsolationLevel::ReadCommitted
761                    }
762                    TransactionIsolationLevel::SnapshotIsolation => {
763                        crate::transaction::IsolationLevel::SnapshotIsolation
764                    }
765                    TransactionIsolationLevel::Serializable => {
766                        crate::transaction::IsolationLevel::Serializable
767                    }
768                });
769                self.begin_transaction_inner(read_only, engine_level)?;
770                Ok(QueryResult::status("Transaction started"))
771            }
772            SessionCommand::Commit => {
773                self.commit_inner()?;
774                Ok(QueryResult::status("Transaction committed"))
775            }
776            SessionCommand::Rollback => {
777                self.rollback_inner()?;
778                Ok(QueryResult::status("Transaction rolled back"))
779            }
780            SessionCommand::Savepoint(name) => {
781                self.savepoint(&name)?;
782                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
783            }
784            SessionCommand::RollbackToSavepoint(name) => {
785                self.rollback_to_savepoint(&name)?;
786                Ok(QueryResult::status(format!(
787                    "Rolled back to savepoint '{name}'"
788                )))
789            }
790            SessionCommand::ReleaseSavepoint(name) => {
791                self.release_savepoint(&name)?;
792                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
793            }
794        }
795    }
796
797    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
798    #[cfg(feature = "wal")]
799    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
800        if let Some(ref wal) = self.wal
801            && let Err(e) = wal.log(record)
802        {
803            tracing::warn!("Failed to log schema change to WAL: {}", e);
804        }
805    }
806
807    /// Executes a schema DDL command, returning a status result.
808    #[cfg(feature = "gql")]
809    fn execute_schema_command(
810        &self,
811        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
812    ) -> Result<QueryResult> {
813        use crate::catalog::{
814            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
815        };
816        use grafeo_adapters::query::gql::ast::SchemaStatement;
817        #[cfg(feature = "wal")]
818        use grafeo_adapters::storage::wal::WalRecord;
819        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
820
821        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
822        macro_rules! wal_log {
823            ($self:expr, $record:expr) => {
824                #[cfg(feature = "wal")]
825                $self.log_schema_wal(&$record);
826            };
827        }
828
829        let result = match cmd {
830            SchemaStatement::CreateNodeType(stmt) => {
831                #[cfg(feature = "wal")]
832                let props_for_wal: Vec<(String, String, bool)> = stmt
833                    .properties
834                    .iter()
835                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
836                    .collect();
837                let def = NodeTypeDefinition {
838                    name: stmt.name.clone(),
839                    properties: stmt
840                        .properties
841                        .iter()
842                        .map(|p| TypedProperty {
843                            name: p.name.clone(),
844                            data_type: PropertyDataType::from_type_name(&p.data_type),
845                            nullable: p.nullable,
846                            default_value: p
847                                .default_value
848                                .as_ref()
849                                .map(|s| parse_default_literal(s)),
850                        })
851                        .collect(),
852                    constraints: Vec::new(),
853                    parent_types: stmt.parent_types.clone(),
854                };
855                let result = if stmt.or_replace {
856                    let _ = self.catalog.drop_node_type(&stmt.name);
857                    self.catalog.register_node_type(def)
858                } else {
859                    self.catalog.register_node_type(def)
860                };
861                match result {
862                    Ok(()) => {
863                        wal_log!(
864                            self,
865                            WalRecord::CreateNodeType {
866                                name: stmt.name.clone(),
867                                properties: props_for_wal,
868                                constraints: Vec::new(),
869                            }
870                        );
871                        Ok(QueryResult::status(format!(
872                            "Created node type '{}'",
873                            stmt.name
874                        )))
875                    }
876                    Err(e) if stmt.if_not_exists => {
877                        let _ = e;
878                        Ok(QueryResult::status("No change"))
879                    }
880                    Err(e) => Err(Error::Query(QueryError::new(
881                        QueryErrorKind::Semantic,
882                        e.to_string(),
883                    ))),
884                }
885            }
886            SchemaStatement::CreateEdgeType(stmt) => {
887                #[cfg(feature = "wal")]
888                let props_for_wal: Vec<(String, String, bool)> = stmt
889                    .properties
890                    .iter()
891                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
892                    .collect();
893                let def = EdgeTypeDefinition {
894                    name: stmt.name.clone(),
895                    properties: stmt
896                        .properties
897                        .iter()
898                        .map(|p| TypedProperty {
899                            name: p.name.clone(),
900                            data_type: PropertyDataType::from_type_name(&p.data_type),
901                            nullable: p.nullable,
902                            default_value: p
903                                .default_value
904                                .as_ref()
905                                .map(|s| parse_default_literal(s)),
906                        })
907                        .collect(),
908                    constraints: Vec::new(),
909                    source_node_types: stmt.source_node_types.clone(),
910                    target_node_types: stmt.target_node_types.clone(),
911                };
912                let result = if stmt.or_replace {
913                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
914                    self.catalog.register_edge_type_def(def)
915                } else {
916                    self.catalog.register_edge_type_def(def)
917                };
918                match result {
919                    Ok(()) => {
920                        wal_log!(
921                            self,
922                            WalRecord::CreateEdgeType {
923                                name: stmt.name.clone(),
924                                properties: props_for_wal,
925                                constraints: Vec::new(),
926                            }
927                        );
928                        Ok(QueryResult::status(format!(
929                            "Created edge type '{}'",
930                            stmt.name
931                        )))
932                    }
933                    Err(e) if stmt.if_not_exists => {
934                        let _ = e;
935                        Ok(QueryResult::status("No change"))
936                    }
937                    Err(e) => Err(Error::Query(QueryError::new(
938                        QueryErrorKind::Semantic,
939                        e.to_string(),
940                    ))),
941                }
942            }
943            SchemaStatement::CreateVectorIndex(stmt) => {
944                Self::create_vector_index_on_store(
945                    &self.active_lpg_store(),
946                    &stmt.node_label,
947                    &stmt.property,
948                    stmt.dimensions,
949                    stmt.metric.as_deref(),
950                )?;
951                wal_log!(
952                    self,
953                    WalRecord::CreateIndex {
954                        name: stmt.name.clone(),
955                        label: stmt.node_label.clone(),
956                        property: stmt.property.clone(),
957                        index_type: "vector".to_string(),
958                    }
959                );
960                Ok(QueryResult::status(format!(
961                    "Created vector index '{}'",
962                    stmt.name
963                )))
964            }
965            SchemaStatement::DropNodeType { name, if_exists } => {
966                match self.catalog.drop_node_type(&name) {
967                    Ok(()) => {
968                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
969                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
970                    }
971                    Err(e) if if_exists => {
972                        let _ = e;
973                        Ok(QueryResult::status("No change"))
974                    }
975                    Err(e) => Err(Error::Query(QueryError::new(
976                        QueryErrorKind::Semantic,
977                        e.to_string(),
978                    ))),
979                }
980            }
981            SchemaStatement::DropEdgeType { name, if_exists } => {
982                match self.catalog.drop_edge_type_def(&name) {
983                    Ok(()) => {
984                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
985                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
986                    }
987                    Err(e) if if_exists => {
988                        let _ = e;
989                        Ok(QueryResult::status("No change"))
990                    }
991                    Err(e) => Err(Error::Query(QueryError::new(
992                        QueryErrorKind::Semantic,
993                        e.to_string(),
994                    ))),
995                }
996            }
997            SchemaStatement::CreateIndex(stmt) => {
998                use grafeo_adapters::query::gql::ast::IndexKind;
999                let active = self.active_lpg_store();
1000                let index_type_str = match stmt.index_kind {
1001                    IndexKind::Property => "property",
1002                    IndexKind::BTree => "btree",
1003                    IndexKind::Text => "text",
1004                    IndexKind::Vector => "vector",
1005                };
1006                match stmt.index_kind {
1007                    IndexKind::Property | IndexKind::BTree => {
1008                        for prop in &stmt.properties {
1009                            active.create_property_index(prop);
1010                        }
1011                    }
1012                    IndexKind::Text => {
1013                        for prop in &stmt.properties {
1014                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1015                        }
1016                    }
1017                    IndexKind::Vector => {
1018                        for prop in &stmt.properties {
1019                            Self::create_vector_index_on_store(
1020                                &active,
1021                                &stmt.label,
1022                                prop,
1023                                stmt.options.dimensions,
1024                                stmt.options.metric.as_deref(),
1025                            )?;
1026                        }
1027                    }
1028                }
1029                #[cfg(feature = "wal")]
1030                for prop in &stmt.properties {
1031                    wal_log!(
1032                        self,
1033                        WalRecord::CreateIndex {
1034                            name: stmt.name.clone(),
1035                            label: stmt.label.clone(),
1036                            property: prop.clone(),
1037                            index_type: index_type_str.to_string(),
1038                        }
1039                    );
1040                }
1041                Ok(QueryResult::status(format!(
1042                    "Created {} index '{}'",
1043                    index_type_str, stmt.name
1044                )))
1045            }
1046            SchemaStatement::DropIndex { name, if_exists } => {
1047                // Try to drop property index by name
1048                let dropped = self.active_lpg_store().drop_property_index(&name);
1049                if dropped || if_exists {
1050                    if dropped {
1051                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1052                    }
1053                    Ok(QueryResult::status(if dropped {
1054                        format!("Dropped index '{name}'")
1055                    } else {
1056                        "No change".to_string()
1057                    }))
1058                } else {
1059                    Err(Error::Query(QueryError::new(
1060                        QueryErrorKind::Semantic,
1061                        format!("Index '{name}' does not exist"),
1062                    )))
1063                }
1064            }
1065            SchemaStatement::CreateConstraint(stmt) => {
1066                use crate::catalog::TypeConstraint;
1067                use grafeo_adapters::query::gql::ast::ConstraintKind;
1068                let kind_str = match stmt.constraint_kind {
1069                    ConstraintKind::Unique => "unique",
1070                    ConstraintKind::NodeKey => "node_key",
1071                    ConstraintKind::NotNull => "not_null",
1072                    ConstraintKind::Exists => "exists",
1073                };
1074                let constraint_name = stmt
1075                    .name
1076                    .clone()
1077                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1078
1079                // Register constraint in catalog type definitions
1080                match stmt.constraint_kind {
1081                    ConstraintKind::Unique => {
1082                        for prop in &stmt.properties {
1083                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1084                            let prop_id = self.catalog.get_or_create_property_key(prop);
1085                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1086                        }
1087                        let _ = self.catalog.add_constraint_to_type(
1088                            &stmt.label,
1089                            TypeConstraint::Unique(stmt.properties.clone()),
1090                        );
1091                    }
1092                    ConstraintKind::NodeKey => {
1093                        for prop in &stmt.properties {
1094                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1095                            let prop_id = self.catalog.get_or_create_property_key(prop);
1096                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1097                            let _ = self.catalog.add_required_property(label_id, prop_id);
1098                        }
1099                        let _ = self.catalog.add_constraint_to_type(
1100                            &stmt.label,
1101                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1102                        );
1103                    }
1104                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1105                        for prop in &stmt.properties {
1106                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1107                            let prop_id = self.catalog.get_or_create_property_key(prop);
1108                            let _ = self.catalog.add_required_property(label_id, prop_id);
1109                            let _ = self.catalog.add_constraint_to_type(
1110                                &stmt.label,
1111                                TypeConstraint::NotNull(prop.clone()),
1112                            );
1113                        }
1114                    }
1115                }
1116
1117                wal_log!(
1118                    self,
1119                    WalRecord::CreateConstraint {
1120                        name: constraint_name.clone(),
1121                        label: stmt.label.clone(),
1122                        properties: stmt.properties.clone(),
1123                        kind: kind_str.to_string(),
1124                    }
1125                );
1126                Ok(QueryResult::status(format!(
1127                    "Created {kind_str} constraint '{constraint_name}'"
1128                )))
1129            }
1130            SchemaStatement::DropConstraint { name, if_exists } => {
1131                let _ = if_exists;
1132                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1133                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1134            }
1135            SchemaStatement::CreateGraphType(stmt) => {
1136                use crate::catalog::GraphTypeDefinition;
1137                use grafeo_adapters::query::gql::ast::InlineElementType;
1138
1139                // GG04: LIKE clause copies type from existing graph
1140                let (mut node_types, mut edge_types, open) =
1141                    if let Some(ref like_graph) = stmt.like_graph {
1142                        // Infer types from the graph's bound type, or use its existing types
1143                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1144                            if let Some(existing) = self
1145                                .catalog
1146                                .schema()
1147                                .and_then(|s| s.get_graph_type(&type_name))
1148                            {
1149                                (
1150                                    existing.allowed_node_types.clone(),
1151                                    existing.allowed_edge_types.clone(),
1152                                    existing.open,
1153                                )
1154                            } else {
1155                                (Vec::new(), Vec::new(), true)
1156                            }
1157                        } else {
1158                            // GG22: Infer from graph data (labels used in graph)
1159                            let nt = self.catalog.all_node_type_names();
1160                            let et = self.catalog.all_edge_type_names();
1161                            if nt.is_empty() && et.is_empty() {
1162                                (Vec::new(), Vec::new(), true)
1163                            } else {
1164                                (nt, et, false)
1165                            }
1166                        }
1167                    } else {
1168                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1169                    };
1170
1171                // GG03: Register inline element types and add their names
1172                for inline in &stmt.inline_types {
1173                    match inline {
1174                        InlineElementType::Node {
1175                            name,
1176                            properties,
1177                            key_labels,
1178                            ..
1179                        } => {
1180                            let def = NodeTypeDefinition {
1181                                name: name.clone(),
1182                                properties: properties
1183                                    .iter()
1184                                    .map(|p| TypedProperty {
1185                                        name: p.name.clone(),
1186                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1187                                        nullable: p.nullable,
1188                                        default_value: None,
1189                                    })
1190                                    .collect(),
1191                                constraints: Vec::new(),
1192                                parent_types: key_labels.clone(),
1193                            };
1194                            // Register or replace so inline defs override existing
1195                            self.catalog.register_or_replace_node_type(def);
1196                            #[cfg(feature = "wal")]
1197                            {
1198                                let props_for_wal: Vec<(String, String, bool)> = properties
1199                                    .iter()
1200                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1201                                    .collect();
1202                                self.log_schema_wal(&WalRecord::CreateNodeType {
1203                                    name: name.clone(),
1204                                    properties: props_for_wal,
1205                                    constraints: Vec::new(),
1206                                });
1207                            }
1208                            if !node_types.contains(name) {
1209                                node_types.push(name.clone());
1210                            }
1211                        }
1212                        InlineElementType::Edge {
1213                            name,
1214                            properties,
1215                            source_node_types,
1216                            target_node_types,
1217                            ..
1218                        } => {
1219                            let def = EdgeTypeDefinition {
1220                                name: name.clone(),
1221                                properties: properties
1222                                    .iter()
1223                                    .map(|p| TypedProperty {
1224                                        name: p.name.clone(),
1225                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1226                                        nullable: p.nullable,
1227                                        default_value: None,
1228                                    })
1229                                    .collect(),
1230                                constraints: Vec::new(),
1231                                source_node_types: source_node_types.clone(),
1232                                target_node_types: target_node_types.clone(),
1233                            };
1234                            self.catalog.register_or_replace_edge_type_def(def);
1235                            #[cfg(feature = "wal")]
1236                            {
1237                                let props_for_wal: Vec<(String, String, bool)> = properties
1238                                    .iter()
1239                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1240                                    .collect();
1241                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1242                                    name: name.clone(),
1243                                    properties: props_for_wal,
1244                                    constraints: Vec::new(),
1245                                });
1246                            }
1247                            if !edge_types.contains(name) {
1248                                edge_types.push(name.clone());
1249                            }
1250                        }
1251                    }
1252                }
1253
1254                let def = GraphTypeDefinition {
1255                    name: stmt.name.clone(),
1256                    allowed_node_types: node_types.clone(),
1257                    allowed_edge_types: edge_types.clone(),
1258                    open,
1259                };
1260                let result = if stmt.or_replace {
1261                    // Drop existing first, ignore error if not found
1262                    let _ = self.catalog.drop_graph_type(&stmt.name);
1263                    self.catalog.register_graph_type(def)
1264                } else {
1265                    self.catalog.register_graph_type(def)
1266                };
1267                match result {
1268                    Ok(()) => {
1269                        wal_log!(
1270                            self,
1271                            WalRecord::CreateGraphType {
1272                                name: stmt.name.clone(),
1273                                node_types,
1274                                edge_types,
1275                                open,
1276                            }
1277                        );
1278                        Ok(QueryResult::status(format!(
1279                            "Created graph type '{}'",
1280                            stmt.name
1281                        )))
1282                    }
1283                    Err(e) if stmt.if_not_exists => {
1284                        let _ = e;
1285                        Ok(QueryResult::status("No change"))
1286                    }
1287                    Err(e) => Err(Error::Query(QueryError::new(
1288                        QueryErrorKind::Semantic,
1289                        e.to_string(),
1290                    ))),
1291                }
1292            }
1293            SchemaStatement::DropGraphType { name, if_exists } => {
1294                match self.catalog.drop_graph_type(&name) {
1295                    Ok(()) => {
1296                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1297                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1298                    }
1299                    Err(e) if if_exists => {
1300                        let _ = e;
1301                        Ok(QueryResult::status("No change"))
1302                    }
1303                    Err(e) => Err(Error::Query(QueryError::new(
1304                        QueryErrorKind::Semantic,
1305                        e.to_string(),
1306                    ))),
1307                }
1308            }
1309            SchemaStatement::CreateSchema {
1310                name,
1311                if_not_exists,
1312            } => match self.catalog.register_schema_namespace(name.clone()) {
1313                Ok(()) => {
1314                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1315                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1316                }
1317                Err(e) if if_not_exists => {
1318                    let _ = e;
1319                    Ok(QueryResult::status("No change"))
1320                }
1321                Err(e) => Err(Error::Query(QueryError::new(
1322                    QueryErrorKind::Semantic,
1323                    e.to_string(),
1324                ))),
1325            },
1326            SchemaStatement::DropSchema { name, if_exists } => {
1327                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1328                let prefix = format!("{name}/");
1329                let has_graphs = self
1330                    .store
1331                    .graph_names()
1332                    .iter()
1333                    .any(|g| g.starts_with(&prefix));
1334                if has_graphs {
1335                    return Err(Error::Query(QueryError::new(
1336                        QueryErrorKind::Semantic,
1337                        format!(
1338                            "Schema '{name}' is not empty: drop all graphs in the schema first"
1339                        ),
1340                    )));
1341                }
1342                match self.catalog.drop_schema_namespace(&name) {
1343                    Ok(()) => {
1344                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1345                        // If this session was using the dropped schema, reset it
1346                        let mut current = self.current_schema.lock();
1347                        if current
1348                            .as_deref()
1349                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1350                        {
1351                            *current = None;
1352                        }
1353                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1354                    }
1355                    Err(e) if if_exists => {
1356                        let _ = e;
1357                        Ok(QueryResult::status("No change"))
1358                    }
1359                    Err(e) => Err(Error::Query(QueryError::new(
1360                        QueryErrorKind::Semantic,
1361                        e.to_string(),
1362                    ))),
1363                }
1364            }
1365            SchemaStatement::AlterNodeType(stmt) => {
1366                use grafeo_adapters::query::gql::ast::TypeAlteration;
1367                let mut wal_alts = Vec::new();
1368                for alt in &stmt.alterations {
1369                    match alt {
1370                        TypeAlteration::AddProperty(prop) => {
1371                            let typed = TypedProperty {
1372                                name: prop.name.clone(),
1373                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1374                                nullable: prop.nullable,
1375                                default_value: prop
1376                                    .default_value
1377                                    .as_ref()
1378                                    .map(|s| parse_default_literal(s)),
1379                            };
1380                            self.catalog
1381                                .alter_node_type_add_property(&stmt.name, typed)
1382                                .map_err(|e| {
1383                                    Error::Query(QueryError::new(
1384                                        QueryErrorKind::Semantic,
1385                                        e.to_string(),
1386                                    ))
1387                                })?;
1388                            wal_alts.push((
1389                                "add".to_string(),
1390                                prop.name.clone(),
1391                                prop.data_type.clone(),
1392                                prop.nullable,
1393                            ));
1394                        }
1395                        TypeAlteration::DropProperty(name) => {
1396                            self.catalog
1397                                .alter_node_type_drop_property(&stmt.name, name)
1398                                .map_err(|e| {
1399                                    Error::Query(QueryError::new(
1400                                        QueryErrorKind::Semantic,
1401                                        e.to_string(),
1402                                    ))
1403                                })?;
1404                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1405                        }
1406                    }
1407                }
1408                wal_log!(
1409                    self,
1410                    WalRecord::AlterNodeType {
1411                        name: stmt.name.clone(),
1412                        alterations: wal_alts,
1413                    }
1414                );
1415                Ok(QueryResult::status(format!(
1416                    "Altered node type '{}'",
1417                    stmt.name
1418                )))
1419            }
1420            SchemaStatement::AlterEdgeType(stmt) => {
1421                use grafeo_adapters::query::gql::ast::TypeAlteration;
1422                let mut wal_alts = Vec::new();
1423                for alt in &stmt.alterations {
1424                    match alt {
1425                        TypeAlteration::AddProperty(prop) => {
1426                            let typed = TypedProperty {
1427                                name: prop.name.clone(),
1428                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1429                                nullable: prop.nullable,
1430                                default_value: prop
1431                                    .default_value
1432                                    .as_ref()
1433                                    .map(|s| parse_default_literal(s)),
1434                            };
1435                            self.catalog
1436                                .alter_edge_type_add_property(&stmt.name, typed)
1437                                .map_err(|e| {
1438                                    Error::Query(QueryError::new(
1439                                        QueryErrorKind::Semantic,
1440                                        e.to_string(),
1441                                    ))
1442                                })?;
1443                            wal_alts.push((
1444                                "add".to_string(),
1445                                prop.name.clone(),
1446                                prop.data_type.clone(),
1447                                prop.nullable,
1448                            ));
1449                        }
1450                        TypeAlteration::DropProperty(name) => {
1451                            self.catalog
1452                                .alter_edge_type_drop_property(&stmt.name, name)
1453                                .map_err(|e| {
1454                                    Error::Query(QueryError::new(
1455                                        QueryErrorKind::Semantic,
1456                                        e.to_string(),
1457                                    ))
1458                                })?;
1459                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1460                        }
1461                    }
1462                }
1463                wal_log!(
1464                    self,
1465                    WalRecord::AlterEdgeType {
1466                        name: stmt.name.clone(),
1467                        alterations: wal_alts,
1468                    }
1469                );
1470                Ok(QueryResult::status(format!(
1471                    "Altered edge type '{}'",
1472                    stmt.name
1473                )))
1474            }
1475            SchemaStatement::AlterGraphType(stmt) => {
1476                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1477                let mut wal_alts = Vec::new();
1478                for alt in &stmt.alterations {
1479                    match alt {
1480                        GraphTypeAlteration::AddNodeType(name) => {
1481                            self.catalog
1482                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1483                                .map_err(|e| {
1484                                    Error::Query(QueryError::new(
1485                                        QueryErrorKind::Semantic,
1486                                        e.to_string(),
1487                                    ))
1488                                })?;
1489                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1490                        }
1491                        GraphTypeAlteration::DropNodeType(name) => {
1492                            self.catalog
1493                                .alter_graph_type_drop_node_type(&stmt.name, name)
1494                                .map_err(|e| {
1495                                    Error::Query(QueryError::new(
1496                                        QueryErrorKind::Semantic,
1497                                        e.to_string(),
1498                                    ))
1499                                })?;
1500                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1501                        }
1502                        GraphTypeAlteration::AddEdgeType(name) => {
1503                            self.catalog
1504                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1505                                .map_err(|e| {
1506                                    Error::Query(QueryError::new(
1507                                        QueryErrorKind::Semantic,
1508                                        e.to_string(),
1509                                    ))
1510                                })?;
1511                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1512                        }
1513                        GraphTypeAlteration::DropEdgeType(name) => {
1514                            self.catalog
1515                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1516                                .map_err(|e| {
1517                                    Error::Query(QueryError::new(
1518                                        QueryErrorKind::Semantic,
1519                                        e.to_string(),
1520                                    ))
1521                                })?;
1522                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1523                        }
1524                    }
1525                }
1526                wal_log!(
1527                    self,
1528                    WalRecord::AlterGraphType {
1529                        name: stmt.name.clone(),
1530                        alterations: wal_alts,
1531                    }
1532                );
1533                Ok(QueryResult::status(format!(
1534                    "Altered graph type '{}'",
1535                    stmt.name
1536                )))
1537            }
1538            SchemaStatement::CreateProcedure(stmt) => {
1539                use crate::catalog::ProcedureDefinition;
1540
1541                let def = ProcedureDefinition {
1542                    name: stmt.name.clone(),
1543                    params: stmt
1544                        .params
1545                        .iter()
1546                        .map(|p| (p.name.clone(), p.param_type.clone()))
1547                        .collect(),
1548                    returns: stmt
1549                        .returns
1550                        .iter()
1551                        .map(|r| (r.name.clone(), r.return_type.clone()))
1552                        .collect(),
1553                    body: stmt.body.clone(),
1554                };
1555
1556                if stmt.or_replace {
1557                    self.catalog.replace_procedure(def).map_err(|e| {
1558                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1559                    })?;
1560                } else {
1561                    match self.catalog.register_procedure(def) {
1562                        Ok(()) => {}
1563                        Err(_) if stmt.if_not_exists => {
1564                            return Ok(QueryResult::empty());
1565                        }
1566                        Err(e) => {
1567                            return Err(Error::Query(QueryError::new(
1568                                QueryErrorKind::Semantic,
1569                                e.to_string(),
1570                            )));
1571                        }
1572                    }
1573                }
1574
1575                wal_log!(
1576                    self,
1577                    WalRecord::CreateProcedure {
1578                        name: stmt.name.clone(),
1579                        params: stmt
1580                            .params
1581                            .iter()
1582                            .map(|p| (p.name.clone(), p.param_type.clone()))
1583                            .collect(),
1584                        returns: stmt
1585                            .returns
1586                            .iter()
1587                            .map(|r| (r.name.clone(), r.return_type.clone()))
1588                            .collect(),
1589                        body: stmt.body,
1590                    }
1591                );
1592                Ok(QueryResult::status(format!(
1593                    "Created procedure '{}'",
1594                    stmt.name
1595                )))
1596            }
1597            SchemaStatement::DropProcedure { name, if_exists } => {
1598                match self.catalog.drop_procedure(&name) {
1599                    Ok(()) => {}
1600                    Err(_) if if_exists => {
1601                        return Ok(QueryResult::empty());
1602                    }
1603                    Err(e) => {
1604                        return Err(Error::Query(QueryError::new(
1605                            QueryErrorKind::Semantic,
1606                            e.to_string(),
1607                        )));
1608                    }
1609                }
1610                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1611                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1612            }
1613            SchemaStatement::ShowIndexes => {
1614                return self.execute_show_indexes();
1615            }
1616            SchemaStatement::ShowConstraints => {
1617                return self.execute_show_constraints();
1618            }
1619            SchemaStatement::ShowNodeTypes => {
1620                return self.execute_show_node_types();
1621            }
1622            SchemaStatement::ShowEdgeTypes => {
1623                return self.execute_show_edge_types();
1624            }
1625            SchemaStatement::ShowGraphTypes => {
1626                return self.execute_show_graph_types();
1627            }
1628            SchemaStatement::ShowGraphType(name) => {
1629                return self.execute_show_graph_type(&name);
1630            }
1631            SchemaStatement::ShowCurrentGraphType => {
1632                return self.execute_show_current_graph_type();
1633            }
1634            SchemaStatement::ShowGraphs => {
1635                return self.execute_show_graphs();
1636            }
1637            SchemaStatement::ShowSchemas => {
1638                return self.execute_show_schemas();
1639            }
1640        };
1641
1642        // Invalidate all cached query plans after any successful DDL change.
1643        // DDL is rare, so clearing the entire cache is cheap and correct.
1644        if result.is_ok() {
1645            self.query_cache.clear();
1646        }
1647
1648        result
1649    }
1650
1651    /// Creates a vector index on the store by scanning existing nodes.
1652    #[cfg(all(feature = "gql", feature = "vector-index"))]
1653    fn create_vector_index_on_store(
1654        store: &LpgStore,
1655        label: &str,
1656        property: &str,
1657        dimensions: Option<usize>,
1658        metric: Option<&str>,
1659    ) -> Result<()> {
1660        use grafeo_common::types::{PropertyKey, Value};
1661        use grafeo_common::utils::error::Error;
1662        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1663
1664        let metric = match metric {
1665            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1666                Error::Internal(format!(
1667                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1668                ))
1669            })?,
1670            None => DistanceMetric::Cosine,
1671        };
1672
1673        let prop_key = PropertyKey::new(property);
1674        let mut found_dims: Option<usize> = dimensions;
1675        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1676
1677        for node in store.nodes_with_label(label) {
1678            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1679                if let Some(expected) = found_dims {
1680                    if v.len() != expected {
1681                        return Err(Error::Internal(format!(
1682                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1683                            v.len(),
1684                            node.id.0
1685                        )));
1686                    }
1687                } else {
1688                    found_dims = Some(v.len());
1689                }
1690                vectors.push((node.id, v.to_vec()));
1691            }
1692        }
1693
1694        let Some(dims) = found_dims else {
1695            return Err(Error::Internal(format!(
1696                "No vector properties found on :{label}({property}) and no dimensions specified"
1697            )));
1698        };
1699
1700        let config = HnswConfig::new(dims, metric);
1701        let index = HnswIndex::with_capacity(config, vectors.len());
1702        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1703        for (node_id, vec) in &vectors {
1704            index.insert(*node_id, vec, &accessor);
1705        }
1706
1707        store.add_vector_index(label, property, Arc::new(index));
1708        Ok(())
1709    }
1710
1711    /// Stub for when vector-index feature is not enabled.
1712    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1713    fn create_vector_index_on_store(
1714        _store: &LpgStore,
1715        _label: &str,
1716        _property: &str,
1717        _dimensions: Option<usize>,
1718        _metric: Option<&str>,
1719    ) -> Result<()> {
1720        Err(grafeo_common::utils::error::Error::Internal(
1721            "Vector index support requires the 'vector-index' feature".to_string(),
1722        ))
1723    }
1724
1725    /// Creates a text index on the store by scanning existing nodes.
1726    #[cfg(all(feature = "gql", feature = "text-index"))]
1727    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1728        use grafeo_common::types::{PropertyKey, Value};
1729        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1730
1731        let mut index = InvertedIndex::new(BM25Config::default());
1732        let prop_key = PropertyKey::new(property);
1733
1734        let nodes = store.nodes_by_label(label);
1735        for node_id in nodes {
1736            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1737                index.insert(node_id, text.as_str());
1738            }
1739        }
1740
1741        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1742        Ok(())
1743    }
1744
1745    /// Stub for when text-index feature is not enabled.
1746    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1747    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1748        Err(grafeo_common::utils::error::Error::Internal(
1749            "Text index support requires the 'text-index' feature".to_string(),
1750        ))
1751    }
1752
1753    /// Returns a table of all indexes from the catalog.
1754    fn execute_show_indexes(&self) -> Result<QueryResult> {
1755        let indexes = self.catalog.all_indexes();
1756        let columns = vec![
1757            "name".to_string(),
1758            "type".to_string(),
1759            "label".to_string(),
1760            "property".to_string(),
1761        ];
1762        let rows: Vec<Vec<Value>> = indexes
1763            .into_iter()
1764            .map(|def| {
1765                let label_name = self
1766                    .catalog
1767                    .get_label_name(def.label)
1768                    .unwrap_or_else(|| "?".into());
1769                let prop_name = self
1770                    .catalog
1771                    .get_property_key_name(def.property_key)
1772                    .unwrap_or_else(|| "?".into());
1773                vec![
1774                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1775                    Value::from(format!("{:?}", def.index_type)),
1776                    Value::from(&*label_name),
1777                    Value::from(&*prop_name),
1778                ]
1779            })
1780            .collect();
1781        Ok(QueryResult {
1782            columns,
1783            column_types: Vec::new(),
1784            rows,
1785            ..QueryResult::empty()
1786        })
1787    }
1788
1789    /// Returns a table of all constraints (currently metadata-only).
1790    fn execute_show_constraints(&self) -> Result<QueryResult> {
1791        // Constraints are tracked in WAL but not yet in a queryable catalog.
1792        // Return an empty table with the expected schema.
1793        Ok(QueryResult {
1794            columns: vec![
1795                "name".to_string(),
1796                "type".to_string(),
1797                "label".to_string(),
1798                "properties".to_string(),
1799            ],
1800            column_types: Vec::new(),
1801            rows: Vec::new(),
1802            ..QueryResult::empty()
1803        })
1804    }
1805
1806    /// Returns a table of all registered node types.
1807    fn execute_show_node_types(&self) -> Result<QueryResult> {
1808        let columns = vec![
1809            "name".to_string(),
1810            "properties".to_string(),
1811            "constraints".to_string(),
1812            "parents".to_string(),
1813        ];
1814        let type_names = self.catalog.all_node_type_names();
1815        let rows: Vec<Vec<Value>> = type_names
1816            .into_iter()
1817            .filter_map(|name| {
1818                let def = self.catalog.get_node_type(&name)?;
1819                let props: Vec<String> = def
1820                    .properties
1821                    .iter()
1822                    .map(|p| {
1823                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1824                        format!("{} {}{}", p.name, p.data_type, nullable)
1825                    })
1826                    .collect();
1827                let constraints: Vec<String> =
1828                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1829                let parents = def.parent_types.join(", ");
1830                Some(vec![
1831                    Value::from(name),
1832                    Value::from(props.join(", ")),
1833                    Value::from(constraints.join(", ")),
1834                    Value::from(parents),
1835                ])
1836            })
1837            .collect();
1838        Ok(QueryResult {
1839            columns,
1840            column_types: Vec::new(),
1841            rows,
1842            ..QueryResult::empty()
1843        })
1844    }
1845
1846    /// Returns a table of all registered edge types.
1847    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1848        let columns = vec![
1849            "name".to_string(),
1850            "properties".to_string(),
1851            "source_types".to_string(),
1852            "target_types".to_string(),
1853        ];
1854        let type_names = self.catalog.all_edge_type_names();
1855        let rows: Vec<Vec<Value>> = type_names
1856            .into_iter()
1857            .filter_map(|name| {
1858                let def = self.catalog.get_edge_type_def(&name)?;
1859                let props: Vec<String> = def
1860                    .properties
1861                    .iter()
1862                    .map(|p| {
1863                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1864                        format!("{} {}{}", p.name, p.data_type, nullable)
1865                    })
1866                    .collect();
1867                let src = def.source_node_types.join(", ");
1868                let tgt = def.target_node_types.join(", ");
1869                Some(vec![
1870                    Value::from(name),
1871                    Value::from(props.join(", ")),
1872                    Value::from(src),
1873                    Value::from(tgt),
1874                ])
1875            })
1876            .collect();
1877        Ok(QueryResult {
1878            columns,
1879            column_types: Vec::new(),
1880            rows,
1881            ..QueryResult::empty()
1882        })
1883    }
1884
1885    /// Returns a table of all registered graph types.
1886    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1887        let columns = vec![
1888            "name".to_string(),
1889            "open".to_string(),
1890            "node_types".to_string(),
1891            "edge_types".to_string(),
1892        ];
1893        let type_names = self.catalog.all_graph_type_names();
1894        let rows: Vec<Vec<Value>> = type_names
1895            .into_iter()
1896            .filter_map(|name| {
1897                let def = self.catalog.get_graph_type_def(&name)?;
1898                Some(vec![
1899                    Value::from(name),
1900                    Value::from(def.open),
1901                    Value::from(def.allowed_node_types.join(", ")),
1902                    Value::from(def.allowed_edge_types.join(", ")),
1903                ])
1904            })
1905            .collect();
1906        Ok(QueryResult {
1907            columns,
1908            column_types: Vec::new(),
1909            rows,
1910            ..QueryResult::empty()
1911        })
1912    }
1913
1914    /// Returns the list of named graphs visible in the current schema context.
1915    ///
1916    /// When a session schema is set, only graphs belonging to that schema are
1917    /// shown (their compound prefix is stripped). When no schema is set, graphs
1918    /// without a schema prefix are shown (the default schema).
1919    fn execute_show_graphs(&self) -> Result<QueryResult> {
1920        let schema = self.current_schema.lock().clone();
1921        let all_names = self.store.graph_names();
1922
1923        let mut names: Vec<String> = match &schema {
1924            Some(s) => {
1925                let prefix = format!("{s}/");
1926                all_names
1927                    .into_iter()
1928                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1929                    .collect()
1930            }
1931            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1932        };
1933        names.sort();
1934
1935        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1936        Ok(QueryResult {
1937            columns: vec!["name".to_string()],
1938            column_types: Vec::new(),
1939            rows,
1940            ..QueryResult::empty()
1941        })
1942    }
1943
1944    /// Returns the list of all schema namespaces.
1945    fn execute_show_schemas(&self) -> Result<QueryResult> {
1946        let mut names = self.catalog.schema_names();
1947        names.sort();
1948        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1949        Ok(QueryResult {
1950            columns: vec!["name".to_string()],
1951            column_types: Vec::new(),
1952            rows,
1953            ..QueryResult::empty()
1954        })
1955    }
1956
1957    /// Returns detailed info for a specific graph type.
1958    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1959        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1960
1961        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1962            Error::Query(QueryError::new(
1963                QueryErrorKind::Semantic,
1964                format!("Graph type '{name}' not found"),
1965            ))
1966        })?;
1967
1968        let columns = vec![
1969            "name".to_string(),
1970            "open".to_string(),
1971            "node_types".to_string(),
1972            "edge_types".to_string(),
1973        ];
1974        let rows = vec![vec![
1975            Value::from(def.name),
1976            Value::from(def.open),
1977            Value::from(def.allowed_node_types.join(", ")),
1978            Value::from(def.allowed_edge_types.join(", ")),
1979        ]];
1980        Ok(QueryResult {
1981            columns,
1982            column_types: Vec::new(),
1983            rows,
1984            ..QueryResult::empty()
1985        })
1986    }
1987
1988    /// Returns the graph type bound to the current graph.
1989    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1990        let graph_name = self
1991            .current_graph()
1992            .unwrap_or_else(|| "default".to_string());
1993        let columns = vec![
1994            "graph".to_string(),
1995            "graph_type".to_string(),
1996            "open".to_string(),
1997            "node_types".to_string(),
1998            "edge_types".to_string(),
1999        ];
2000
2001        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2002            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2003        {
2004            let rows = vec![vec![
2005                Value::from(graph_name),
2006                Value::from(type_name),
2007                Value::from(def.open),
2008                Value::from(def.allowed_node_types.join(", ")),
2009                Value::from(def.allowed_edge_types.join(", ")),
2010            ]];
2011            return Ok(QueryResult {
2012                columns,
2013                column_types: Vec::new(),
2014                rows,
2015                ..QueryResult::empty()
2016            });
2017        }
2018
2019        // No graph type binding found
2020        Ok(QueryResult {
2021            columns,
2022            column_types: Vec::new(),
2023            rows: vec![vec![
2024                Value::from(graph_name),
2025                Value::Null,
2026                Value::Null,
2027                Value::Null,
2028                Value::Null,
2029            ]],
2030            ..QueryResult::empty()
2031        })
2032    }
2033
2034    /// Executes a GQL query.
2035    ///
2036    /// # Errors
2037    ///
2038    /// Returns an error if the query fails to parse or execute.
2039    ///
2040    /// # Examples
2041    ///
2042    /// ```no_run
2043    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2044    /// use grafeo_engine::GrafeoDB;
2045    ///
2046    /// let db = GrafeoDB::new_in_memory();
2047    /// let session = db.session();
2048    ///
2049    /// // Create a node
2050    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2051    ///
2052    /// // Query nodes
2053    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2054    /// for row in &result.rows {
2055    ///     println!("{:?}", row);
2056    /// }
2057    /// # Ok(())
2058    /// # }
2059    /// ```
2060    #[cfg(feature = "gql")]
2061    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2062        self.require_lpg("GQL")?;
2063
2064        use crate::query::{
2065            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2066            processor::QueryLanguage, translators::gql,
2067        };
2068
2069        let _span = tracing::info_span!(
2070            "grafeo::session::execute",
2071            language = "gql",
2072            query_len = query.len(),
2073        )
2074        .entered();
2075
2076        #[cfg(not(target_arch = "wasm32"))]
2077        let start_time = std::time::Instant::now();
2078
2079        // Parse and translate, checking for session/schema commands first
2080        let translation = gql::translate_full(query)?;
2081        let logical_plan = match translation {
2082            gql::GqlTranslationResult::SessionCommand(cmd) => {
2083                return self.execute_session_command(cmd);
2084            }
2085            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2086                // All DDL is a write operation
2087                if *self.read_only_tx.lock() {
2088                    return Err(grafeo_common::utils::error::Error::Transaction(
2089                        grafeo_common::utils::error::TransactionError::ReadOnly,
2090                    ));
2091                }
2092                return self.execute_schema_command(cmd);
2093            }
2094            gql::GqlTranslationResult::Plan(plan) => {
2095                // Block mutations in read-only transactions
2096                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2097                    return Err(grafeo_common::utils::error::Error::Transaction(
2098                        grafeo_common::utils::error::TransactionError::ReadOnly,
2099                    ));
2100                }
2101                plan
2102            }
2103        };
2104
2105        // Create cache key for this query
2106        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2107
2108        // Try to get cached optimized plan, or use the plan we just translated
2109        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2110            cached_plan
2111        } else {
2112            // Semantic validation
2113            let mut binder = Binder::new();
2114            let _binding_context = binder.bind(&logical_plan)?;
2115
2116            // Optimize the plan
2117            let active = self.active_store();
2118            let optimizer = Optimizer::from_graph_store(&*active);
2119            let plan = optimizer.optimize(logical_plan)?;
2120
2121            // Cache the optimized plan for future use
2122            self.query_cache.put_optimized(cache_key, plan.clone());
2123
2124            plan
2125        };
2126
2127        // Resolve the active store for query execution
2128        let active = self.active_store();
2129
2130        // EXPLAIN: annotate pushdown hints and return the plan tree
2131        if optimized_plan.explain {
2132            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2133            let mut plan = optimized_plan;
2134            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2135            return Ok(explain_result(&plan));
2136        }
2137
2138        // PROFILE: execute with per-operator instrumentation
2139        if optimized_plan.profile {
2140            let has_mutations = optimized_plan.root.has_mutations();
2141            return self.with_auto_commit(has_mutations, || {
2142                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2143                let planner = self.create_planner_for_store(
2144                    Arc::clone(&active),
2145                    viewing_epoch,
2146                    transaction_id,
2147                );
2148                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2149
2150                let executor = Executor::with_columns(physical_plan.columns.clone())
2151                    .with_deadline(self.query_deadline());
2152                let _result = executor.execute(physical_plan.operator.as_mut())?;
2153
2154                let total_time_ms;
2155                #[cfg(not(target_arch = "wasm32"))]
2156                {
2157                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2158                }
2159                #[cfg(target_arch = "wasm32")]
2160                {
2161                    total_time_ms = 0.0;
2162                }
2163
2164                let profile_tree = crate::query::profile::build_profile_tree(
2165                    &optimized_plan.root,
2166                    &mut entries.into_iter(),
2167                );
2168                Ok(crate::query::profile::profile_result(
2169                    &profile_tree,
2170                    total_time_ms,
2171                ))
2172            });
2173        }
2174
2175        let has_mutations = optimized_plan.root.has_mutations();
2176
2177        let result = self.with_auto_commit(has_mutations, || {
2178            // Get transaction context for MVCC visibility
2179            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2180
2181            // Convert to physical plan with transaction context
2182            // (Physical planning cannot be cached as it depends on transaction state)
2183            // Safe to use read-only fast path when: this query has no mutations AND
2184            // there is no active transaction that may have prior uncommitted writes.
2185            let has_active_tx = self.current_transaction.lock().is_some();
2186            let read_only = !has_mutations && !has_active_tx;
2187            let planner = self.create_planner_for_store_with_read_only(
2188                Arc::clone(&active),
2189                viewing_epoch,
2190                transaction_id,
2191                read_only,
2192            );
2193            let mut physical_plan = planner.plan(&optimized_plan)?;
2194
2195            // Execute the plan
2196            let executor = Executor::with_columns(physical_plan.columns.clone())
2197                .with_deadline(self.query_deadline());
2198            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2199
2200            // Add execution metrics
2201            let rows_scanned = result.rows.len() as u64;
2202            #[cfg(not(target_arch = "wasm32"))]
2203            {
2204                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2205                result.execution_time_ms = Some(elapsed_ms);
2206            }
2207            result.rows_scanned = Some(rows_scanned);
2208
2209            Ok(result)
2210        });
2211
2212        // Record metrics for this query execution.
2213        #[cfg(feature = "metrics")]
2214        {
2215            #[cfg(not(target_arch = "wasm32"))]
2216            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2217            #[cfg(target_arch = "wasm32")]
2218            let elapsed_ms = None;
2219            self.record_query_metrics("gql", elapsed_ms, &result);
2220        }
2221
2222        result
2223    }
2224
2225    /// Executes a GQL query with visibility at the specified epoch.
2226    ///
2227    /// This enables time-travel queries: the query sees the database
2228    /// as it existed at the given epoch.
2229    ///
2230    /// # Errors
2231    ///
2232    /// Returns an error if parsing or execution fails.
2233    #[cfg(feature = "gql")]
2234    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2235        let previous = self.viewing_epoch_override.lock().replace(epoch);
2236        let result = self.execute(query);
2237        *self.viewing_epoch_override.lock() = previous;
2238        result
2239    }
2240
2241    /// Executes a GQL query at a specific epoch with optional parameters.
2242    ///
2243    /// Combines epoch-based time travel with parameterized queries.
2244    ///
2245    /// # Errors
2246    ///
2247    /// Returns an error if parsing or execution fails.
2248    #[cfg(feature = "gql")]
2249    pub fn execute_at_epoch_with_params(
2250        &self,
2251        query: &str,
2252        epoch: EpochId,
2253        params: Option<std::collections::HashMap<String, Value>>,
2254    ) -> Result<QueryResult> {
2255        let previous = self.viewing_epoch_override.lock().replace(epoch);
2256        let result = if let Some(p) = params {
2257            self.execute_with_params(query, p)
2258        } else {
2259            self.execute(query)
2260        };
2261        *self.viewing_epoch_override.lock() = previous;
2262        result
2263    }
2264
2265    /// Executes a GQL query with parameters.
2266    ///
2267    /// # Errors
2268    ///
2269    /// Returns an error if the query fails to parse or execute.
2270    #[cfg(feature = "gql")]
2271    pub fn execute_with_params(
2272        &self,
2273        query: &str,
2274        params: std::collections::HashMap<String, Value>,
2275    ) -> Result<QueryResult> {
2276        self.require_lpg("GQL")?;
2277
2278        use crate::query::processor::{QueryLanguage, QueryProcessor};
2279
2280        let has_mutations = Self::query_looks_like_mutation(query);
2281        let active = self.active_store();
2282
2283        self.with_auto_commit(has_mutations, || {
2284            // Get transaction context for MVCC visibility
2285            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2286
2287            // Create processor with transaction context
2288            let processor = QueryProcessor::for_graph_store_with_transaction(
2289                Arc::clone(&active),
2290                Arc::clone(&self.transaction_manager),
2291            )?;
2292
2293            // Apply transaction context if in a transaction
2294            let processor = if let Some(transaction_id) = transaction_id {
2295                processor.with_transaction_context(viewing_epoch, transaction_id)
2296            } else {
2297                processor
2298            };
2299
2300            processor.process(query, QueryLanguage::Gql, Some(&params))
2301        })
2302    }
2303
2304    /// Executes a GQL query with parameters.
2305    ///
2306    /// # Errors
2307    ///
2308    /// Returns an error if no query language is enabled.
2309    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2310    pub fn execute_with_params(
2311        &self,
2312        _query: &str,
2313        _params: std::collections::HashMap<String, Value>,
2314    ) -> Result<QueryResult> {
2315        Err(grafeo_common::utils::error::Error::Internal(
2316            "No query language enabled".to_string(),
2317        ))
2318    }
2319
2320    /// Executes a GQL query.
2321    ///
2322    /// # Errors
2323    ///
2324    /// Returns an error if no query language is enabled.
2325    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2326    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2327        Err(grafeo_common::utils::error::Error::Internal(
2328            "No query language enabled".to_string(),
2329        ))
2330    }
2331
2332    /// Executes a Cypher query.
2333    ///
2334    /// # Errors
2335    ///
2336    /// Returns an error if the query fails to parse or execute.
2337    #[cfg(feature = "cypher")]
2338    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2339        use crate::query::{
2340            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2341            processor::QueryLanguage, translators::cypher,
2342        };
2343        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2344
2345        // Handle schema DDL and SHOW commands before the normal query path
2346        let translation = cypher::translate_full(query)?;
2347        match translation {
2348            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2349                if *self.read_only_tx.lock() {
2350                    return Err(GrafeoError::Query(QueryError::new(
2351                        QueryErrorKind::Semantic,
2352                        "Cannot execute schema DDL in a read-only transaction",
2353                    )));
2354                }
2355                return self.execute_schema_command(cmd);
2356            }
2357            cypher::CypherTranslationResult::ShowIndexes => {
2358                return self.execute_show_indexes();
2359            }
2360            cypher::CypherTranslationResult::ShowConstraints => {
2361                return self.execute_show_constraints();
2362            }
2363            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2364                return self.execute_show_current_graph_type();
2365            }
2366            cypher::CypherTranslationResult::Plan(_) => {
2367                // Fall through to normal execution below
2368            }
2369        }
2370
2371        #[cfg(not(target_arch = "wasm32"))]
2372        let start_time = std::time::Instant::now();
2373
2374        // Create cache key for this query
2375        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2376
2377        // Try to get cached optimized plan
2378        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2379            cached_plan
2380        } else {
2381            // Parse and translate the query to a logical plan
2382            let logical_plan = cypher::translate(query)?;
2383
2384            // Semantic validation
2385            let mut binder = Binder::new();
2386            let _binding_context = binder.bind(&logical_plan)?;
2387
2388            // Optimize the plan
2389            let active = self.active_store();
2390            let optimizer = Optimizer::from_graph_store(&*active);
2391            let plan = optimizer.optimize(logical_plan)?;
2392
2393            // Cache the optimized plan
2394            self.query_cache.put_optimized(cache_key, plan.clone());
2395
2396            plan
2397        };
2398
2399        // Resolve the active store for query execution
2400        let active = self.active_store();
2401
2402        // EXPLAIN
2403        if optimized_plan.explain {
2404            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2405            let mut plan = optimized_plan;
2406            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2407            return Ok(explain_result(&plan));
2408        }
2409
2410        // PROFILE
2411        if optimized_plan.profile {
2412            let has_mutations = optimized_plan.root.has_mutations();
2413            return self.with_auto_commit(has_mutations, || {
2414                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2415                let planner = self.create_planner_for_store(
2416                    Arc::clone(&active),
2417                    viewing_epoch,
2418                    transaction_id,
2419                );
2420                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2421
2422                let executor = Executor::with_columns(physical_plan.columns.clone())
2423                    .with_deadline(self.query_deadline());
2424                let _result = executor.execute(physical_plan.operator.as_mut())?;
2425
2426                let total_time_ms;
2427                #[cfg(not(target_arch = "wasm32"))]
2428                {
2429                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2430                }
2431                #[cfg(target_arch = "wasm32")]
2432                {
2433                    total_time_ms = 0.0;
2434                }
2435
2436                let profile_tree = crate::query::profile::build_profile_tree(
2437                    &optimized_plan.root,
2438                    &mut entries.into_iter(),
2439                );
2440                Ok(crate::query::profile::profile_result(
2441                    &profile_tree,
2442                    total_time_ms,
2443                ))
2444            });
2445        }
2446
2447        let has_mutations = optimized_plan.root.has_mutations();
2448
2449        let result = self.with_auto_commit(has_mutations, || {
2450            // Get transaction context for MVCC visibility
2451            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2452
2453            // Convert to physical plan with transaction context
2454            let planner =
2455                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2456            let mut physical_plan = planner.plan(&optimized_plan)?;
2457
2458            // Execute the plan
2459            let executor = Executor::with_columns(physical_plan.columns.clone())
2460                .with_deadline(self.query_deadline());
2461            executor.execute(physical_plan.operator.as_mut())
2462        });
2463
2464        #[cfg(feature = "metrics")]
2465        {
2466            #[cfg(not(target_arch = "wasm32"))]
2467            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2468            #[cfg(target_arch = "wasm32")]
2469            let elapsed_ms = None;
2470            self.record_query_metrics("cypher", elapsed_ms, &result);
2471        }
2472
2473        result
2474    }
2475
2476    /// Executes a Gremlin query.
2477    ///
2478    /// # Errors
2479    ///
2480    /// Returns an error if the query fails to parse or execute.
2481    ///
2482    /// # Examples
2483    ///
2484    /// ```no_run
2485    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2486    /// use grafeo_engine::GrafeoDB;
2487    ///
2488    /// let db = GrafeoDB::new_in_memory();
2489    /// let session = db.session();
2490    ///
2491    /// // Create some nodes first
2492    /// session.create_node(&["Person"]);
2493    ///
2494    /// // Query using Gremlin
2495    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2496    /// # Ok(())
2497    /// # }
2498    /// ```
2499    #[cfg(feature = "gremlin")]
2500    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2501        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2502
2503        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2504        let start_time = Instant::now();
2505
2506        // Parse and translate the query to a logical plan
2507        let logical_plan = gremlin::translate(query)?;
2508
2509        // Semantic validation
2510        let mut binder = Binder::new();
2511        let _binding_context = binder.bind(&logical_plan)?;
2512
2513        // Optimize the plan
2514        let active = self.active_store();
2515        let optimizer = Optimizer::from_graph_store(&*active);
2516        let optimized_plan = optimizer.optimize(logical_plan)?;
2517
2518        let has_mutations = optimized_plan.root.has_mutations();
2519
2520        let result = self.with_auto_commit(has_mutations, || {
2521            // Get transaction context for MVCC visibility
2522            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2523
2524            // Convert to physical plan with transaction context
2525            let planner =
2526                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2527            let mut physical_plan = planner.plan(&optimized_plan)?;
2528
2529            // Execute the plan
2530            let executor = Executor::with_columns(physical_plan.columns.clone())
2531                .with_deadline(self.query_deadline());
2532            executor.execute(physical_plan.operator.as_mut())
2533        });
2534
2535        #[cfg(feature = "metrics")]
2536        {
2537            #[cfg(not(target_arch = "wasm32"))]
2538            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2539            #[cfg(target_arch = "wasm32")]
2540            let elapsed_ms = None;
2541            self.record_query_metrics("gremlin", elapsed_ms, &result);
2542        }
2543
2544        result
2545    }
2546
2547    /// Executes a Gremlin query with parameters.
2548    ///
2549    /// # Errors
2550    ///
2551    /// Returns an error if the query fails to parse or execute.
2552    #[cfg(feature = "gremlin")]
2553    pub fn execute_gremlin_with_params(
2554        &self,
2555        query: &str,
2556        params: std::collections::HashMap<String, Value>,
2557    ) -> Result<QueryResult> {
2558        use crate::query::processor::{QueryLanguage, QueryProcessor};
2559
2560        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2561        let start_time = Instant::now();
2562
2563        let has_mutations = Self::query_looks_like_mutation(query);
2564        let active = self.active_store();
2565
2566        let result = self.with_auto_commit(has_mutations, || {
2567            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2568            let processor = QueryProcessor::for_graph_store_with_transaction(
2569                Arc::clone(&active),
2570                Arc::clone(&self.transaction_manager),
2571            )?;
2572            let processor = if let Some(transaction_id) = transaction_id {
2573                processor.with_transaction_context(viewing_epoch, transaction_id)
2574            } else {
2575                processor
2576            };
2577            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2578        });
2579
2580        #[cfg(feature = "metrics")]
2581        {
2582            #[cfg(not(target_arch = "wasm32"))]
2583            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2584            #[cfg(target_arch = "wasm32")]
2585            let elapsed_ms = None;
2586            self.record_query_metrics("gremlin", elapsed_ms, &result);
2587        }
2588
2589        result
2590    }
2591
2592    /// Executes a GraphQL query against the LPG store.
2593    ///
2594    /// # Errors
2595    ///
2596    /// Returns an error if the query fails to parse or execute.
2597    ///
2598    /// # Examples
2599    ///
2600    /// ```no_run
2601    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2602    /// use grafeo_engine::GrafeoDB;
2603    ///
2604    /// let db = GrafeoDB::new_in_memory();
2605    /// let session = db.session();
2606    ///
2607    /// // Create some nodes first
2608    /// session.create_node(&["User"]);
2609    ///
2610    /// // Query using GraphQL
2611    /// let result = session.execute_graphql("query { user { id name } }")?;
2612    /// # Ok(())
2613    /// # }
2614    /// ```
2615    #[cfg(feature = "graphql")]
2616    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2617        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2618
2619        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2620        let start_time = Instant::now();
2621
2622        let logical_plan = graphql::translate(query)?;
2623        let mut binder = Binder::new();
2624        let _binding_context = binder.bind(&logical_plan)?;
2625
2626        let active = self.active_store();
2627        let optimizer = Optimizer::from_graph_store(&*active);
2628        let optimized_plan = optimizer.optimize(logical_plan)?;
2629        let has_mutations = optimized_plan.root.has_mutations();
2630
2631        let result = self.with_auto_commit(has_mutations, || {
2632            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2633            let planner =
2634                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2635            let mut physical_plan = planner.plan(&optimized_plan)?;
2636            let executor = Executor::with_columns(physical_plan.columns.clone())
2637                .with_deadline(self.query_deadline());
2638            executor.execute(physical_plan.operator.as_mut())
2639        });
2640
2641        #[cfg(feature = "metrics")]
2642        {
2643            #[cfg(not(target_arch = "wasm32"))]
2644            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2645            #[cfg(target_arch = "wasm32")]
2646            let elapsed_ms = None;
2647            self.record_query_metrics("graphql", elapsed_ms, &result);
2648        }
2649
2650        result
2651    }
2652
2653    /// Executes a GraphQL query with parameters.
2654    ///
2655    /// # Errors
2656    ///
2657    /// Returns an error if the query fails to parse or execute.
2658    #[cfg(feature = "graphql")]
2659    pub fn execute_graphql_with_params(
2660        &self,
2661        query: &str,
2662        params: std::collections::HashMap<String, Value>,
2663    ) -> Result<QueryResult> {
2664        use crate::query::processor::{QueryLanguage, QueryProcessor};
2665
2666        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2667        let start_time = Instant::now();
2668
2669        let has_mutations = Self::query_looks_like_mutation(query);
2670        let active = self.active_store();
2671
2672        let result = self.with_auto_commit(has_mutations, || {
2673            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2674            let processor = QueryProcessor::for_graph_store_with_transaction(
2675                Arc::clone(&active),
2676                Arc::clone(&self.transaction_manager),
2677            )?;
2678            let processor = if let Some(transaction_id) = transaction_id {
2679                processor.with_transaction_context(viewing_epoch, transaction_id)
2680            } else {
2681                processor
2682            };
2683            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2684        });
2685
2686        #[cfg(feature = "metrics")]
2687        {
2688            #[cfg(not(target_arch = "wasm32"))]
2689            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2690            #[cfg(target_arch = "wasm32")]
2691            let elapsed_ms = None;
2692            self.record_query_metrics("graphql", elapsed_ms, &result);
2693        }
2694
2695        result
2696    }
2697
2698    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2699    ///
2700    /// # Errors
2701    ///
2702    /// Returns an error if the query fails to parse or execute.
2703    ///
2704    /// # Examples
2705    ///
2706    /// ```no_run
2707    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2708    /// use grafeo_engine::GrafeoDB;
2709    ///
2710    /// let db = GrafeoDB::new_in_memory();
2711    /// let session = db.session();
2712    ///
2713    /// let result = session.execute_sql(
2714    ///     "SELECT * FROM GRAPH_TABLE (
2715    ///         MATCH (n:Person)
2716    ///         COLUMNS (n.name AS name)
2717    ///     )"
2718    /// )?;
2719    /// # Ok(())
2720    /// # }
2721    /// ```
2722    #[cfg(feature = "sql-pgq")]
2723    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2724        use crate::query::{
2725            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2726            processor::QueryLanguage, translators::sql_pgq,
2727        };
2728
2729        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2730        let start_time = Instant::now();
2731
2732        // Parse and translate (always needed to check for DDL)
2733        let logical_plan = sql_pgq::translate(query)?;
2734
2735        // Handle DDL statements directly (they don't go through the query pipeline)
2736        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2737            return Ok(QueryResult {
2738                columns: vec!["status".into()],
2739                column_types: vec![grafeo_common::types::LogicalType::String],
2740                rows: vec![vec![Value::from(format!(
2741                    "Property graph '{}' created ({} node tables, {} edge tables)",
2742                    cpg.name,
2743                    cpg.node_tables.len(),
2744                    cpg.edge_tables.len()
2745                ))]],
2746                execution_time_ms: None,
2747                rows_scanned: None,
2748                status_message: None,
2749                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2750            });
2751        }
2752
2753        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2754
2755        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2756            cached_plan
2757        } else {
2758            let mut binder = Binder::new();
2759            let _binding_context = binder.bind(&logical_plan)?;
2760            let active = self.active_store();
2761            let optimizer = Optimizer::from_graph_store(&*active);
2762            let plan = optimizer.optimize(logical_plan)?;
2763            self.query_cache.put_optimized(cache_key, plan.clone());
2764            plan
2765        };
2766
2767        let active = self.active_store();
2768        let has_mutations = optimized_plan.root.has_mutations();
2769
2770        let result = self.with_auto_commit(has_mutations, || {
2771            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2772            let planner =
2773                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2774            let mut physical_plan = planner.plan(&optimized_plan)?;
2775            let executor = Executor::with_columns(physical_plan.columns.clone())
2776                .with_deadline(self.query_deadline());
2777            executor.execute(physical_plan.operator.as_mut())
2778        });
2779
2780        #[cfg(feature = "metrics")]
2781        {
2782            #[cfg(not(target_arch = "wasm32"))]
2783            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2784            #[cfg(target_arch = "wasm32")]
2785            let elapsed_ms = None;
2786            self.record_query_metrics("sql", elapsed_ms, &result);
2787        }
2788
2789        result
2790    }
2791
2792    /// Executes a SQL/PGQ query with parameters.
2793    ///
2794    /// # Errors
2795    ///
2796    /// Returns an error if the query fails to parse or execute.
2797    #[cfg(feature = "sql-pgq")]
2798    pub fn execute_sql_with_params(
2799        &self,
2800        query: &str,
2801        params: std::collections::HashMap<String, Value>,
2802    ) -> Result<QueryResult> {
2803        use crate::query::processor::{QueryLanguage, QueryProcessor};
2804
2805        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2806        let start_time = Instant::now();
2807
2808        let has_mutations = Self::query_looks_like_mutation(query);
2809        let active = self.active_store();
2810
2811        let result = self.with_auto_commit(has_mutations, || {
2812            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2813            let processor = QueryProcessor::for_graph_store_with_transaction(
2814                Arc::clone(&active),
2815                Arc::clone(&self.transaction_manager),
2816            )?;
2817            let processor = if let Some(transaction_id) = transaction_id {
2818                processor.with_transaction_context(viewing_epoch, transaction_id)
2819            } else {
2820                processor
2821            };
2822            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2823        });
2824
2825        #[cfg(feature = "metrics")]
2826        {
2827            #[cfg(not(target_arch = "wasm32"))]
2828            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2829            #[cfg(target_arch = "wasm32")]
2830            let elapsed_ms = None;
2831            self.record_query_metrics("sql", elapsed_ms, &result);
2832        }
2833
2834        result
2835    }
2836
2837    /// Executes a query in the specified language by name.
2838    ///
2839    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2840    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2841    ///
2842    /// # Errors
2843    ///
2844    /// Returns an error if the language is unknown/disabled or the query fails.
2845    pub fn execute_language(
2846        &self,
2847        query: &str,
2848        language: &str,
2849        params: Option<std::collections::HashMap<String, Value>>,
2850    ) -> Result<QueryResult> {
2851        let _span = tracing::info_span!(
2852            "grafeo::session::execute",
2853            language,
2854            query_len = query.len(),
2855        )
2856        .entered();
2857        match language {
2858            "gql" => {
2859                if let Some(p) = params {
2860                    self.execute_with_params(query, p)
2861                } else {
2862                    self.execute(query)
2863                }
2864            }
2865            #[cfg(feature = "cypher")]
2866            "cypher" => {
2867                if let Some(p) = params {
2868                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2869
2870                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2871                    let start_time = Instant::now();
2872
2873                    let has_mutations = Self::query_looks_like_mutation(query);
2874                    let active = self.active_store();
2875                    let result = self.with_auto_commit(has_mutations, || {
2876                        let processor = QueryProcessor::for_graph_store_with_transaction(
2877                            Arc::clone(&active),
2878                            Arc::clone(&self.transaction_manager),
2879                        )?;
2880                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2881                        let processor = if let Some(transaction_id) = transaction_id {
2882                            processor.with_transaction_context(viewing_epoch, transaction_id)
2883                        } else {
2884                            processor
2885                        };
2886                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2887                    });
2888
2889                    #[cfg(feature = "metrics")]
2890                    {
2891                        #[cfg(not(target_arch = "wasm32"))]
2892                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2893                        #[cfg(target_arch = "wasm32")]
2894                        let elapsed_ms = None;
2895                        self.record_query_metrics("cypher", elapsed_ms, &result);
2896                    }
2897
2898                    result
2899                } else {
2900                    self.execute_cypher(query)
2901                }
2902            }
2903            #[cfg(feature = "gremlin")]
2904            "gremlin" => {
2905                if let Some(p) = params {
2906                    self.execute_gremlin_with_params(query, p)
2907                } else {
2908                    self.execute_gremlin(query)
2909                }
2910            }
2911            #[cfg(feature = "graphql")]
2912            "graphql" => {
2913                if let Some(p) = params {
2914                    self.execute_graphql_with_params(query, p)
2915                } else {
2916                    self.execute_graphql(query)
2917                }
2918            }
2919            #[cfg(all(feature = "graphql", feature = "rdf"))]
2920            "graphql-rdf" => {
2921                if let Some(p) = params {
2922                    self.execute_graphql_rdf_with_params(query, p)
2923                } else {
2924                    self.execute_graphql_rdf(query)
2925                }
2926            }
2927            #[cfg(feature = "sql-pgq")]
2928            "sql" | "sql-pgq" => {
2929                if let Some(p) = params {
2930                    self.execute_sql_with_params(query, p)
2931                } else {
2932                    self.execute_sql(query)
2933                }
2934            }
2935            #[cfg(all(feature = "sparql", feature = "rdf"))]
2936            "sparql" => {
2937                if let Some(p) = params {
2938                    self.execute_sparql_with_params(query, p)
2939                } else {
2940                    self.execute_sparql(query)
2941                }
2942            }
2943            other => Err(grafeo_common::utils::error::Error::Query(
2944                grafeo_common::utils::error::QueryError::new(
2945                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2946                    format!("Unknown query language: '{other}'"),
2947                ),
2948            )),
2949        }
2950    }
2951
2952    /// Begins a new transaction.
2953    ///
2954    /// # Errors
2955    ///
2956    /// Returns an error if a transaction is already active.
2957    ///
2958    /// # Examples
2959    ///
2960    /// ```no_run
2961    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2962    /// use grafeo_engine::GrafeoDB;
2963    ///
2964    /// let db = GrafeoDB::new_in_memory();
2965    /// let mut session = db.session();
2966    ///
2967    /// session.begin_transaction()?;
2968    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2969    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2970    /// session.commit()?; // Both inserts committed atomically
2971    /// # Ok(())
2972    /// # }
2973    /// ```
2974    /// Clears all cached query plans.
2975    ///
2976    /// The plan cache is shared across all sessions on the same database,
2977    /// so clearing from one session affects all sessions.
2978    pub fn clear_plan_cache(&self) {
2979        self.query_cache.clear();
2980    }
2981
2982    /// Begins a new transaction on this session.
2983    ///
2984    /// Uses the default isolation level (`SnapshotIsolation`).
2985    ///
2986    /// # Errors
2987    ///
2988    /// Returns an error if a transaction is already active.
2989    pub fn begin_transaction(&mut self) -> Result<()> {
2990        self.begin_transaction_inner(false, None)
2991    }
2992
2993    /// Begins a transaction with a specific isolation level.
2994    ///
2995    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2996    ///
2997    /// # Errors
2998    ///
2999    /// Returns an error if a transaction is already active.
3000    pub fn begin_transaction_with_isolation(
3001        &mut self,
3002        isolation_level: crate::transaction::IsolationLevel,
3003    ) -> Result<()> {
3004        self.begin_transaction_inner(false, Some(isolation_level))
3005    }
3006
3007    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3008    fn begin_transaction_inner(
3009        &self,
3010        read_only: bool,
3011        isolation_level: Option<crate::transaction::IsolationLevel>,
3012    ) -> Result<()> {
3013        let _span = tracing::debug_span!("grafeo::tx::begin", read_only).entered();
3014        let mut current = self.current_transaction.lock();
3015        if current.is_some() {
3016            // Nested transaction: create an auto-savepoint instead of a new tx.
3017            drop(current);
3018            let mut depth = self.transaction_nesting_depth.lock();
3019            *depth += 1;
3020            let sp_name = format!("_nested_tx_{}", *depth);
3021            self.savepoint(&sp_name)?;
3022            return Ok(());
3023        }
3024
3025        let active = self.active_lpg_store();
3026        self.transaction_start_node_count
3027            .store(active.node_count(), Ordering::Relaxed);
3028        self.transaction_start_edge_count
3029            .store(active.edge_count(), Ordering::Relaxed);
3030        let transaction_id = if let Some(level) = isolation_level {
3031            self.transaction_manager.begin_with_isolation(level)
3032        } else {
3033            self.transaction_manager.begin()
3034        };
3035        *current = Some(transaction_id);
3036        *self.read_only_tx.lock() = read_only || self.db_read_only;
3037
3038        // Record the initial graph as "touched" for cross-graph atomicity.
3039        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3040        let key = self.active_graph_storage_key();
3041        let mut touched = self.touched_graphs.lock();
3042        touched.clear();
3043        touched.push(key);
3044
3045        #[cfg(feature = "metrics")]
3046        {
3047            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3048            #[cfg(not(target_arch = "wasm32"))]
3049            {
3050                *self.tx_start_time.lock() = Some(Instant::now());
3051            }
3052        }
3053
3054        Ok(())
3055    }
3056
3057    /// Commits the current transaction.
3058    ///
3059    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3060    ///
3061    /// # Errors
3062    ///
3063    /// Returns an error if no transaction is active.
3064    pub fn commit(&mut self) -> Result<()> {
3065        self.commit_inner()
3066    }
3067
3068    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3069    fn commit_inner(&self) -> Result<()> {
3070        let _span = tracing::debug_span!("grafeo::tx::commit").entered();
3071        // Nested transaction: release the auto-savepoint (changes are preserved).
3072        {
3073            let mut depth = self.transaction_nesting_depth.lock();
3074            if *depth > 0 {
3075                let sp_name = format!("_nested_tx_{depth}");
3076                *depth -= 1;
3077                drop(depth);
3078                return self.release_savepoint(&sp_name);
3079            }
3080        }
3081
3082        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3083            grafeo_common::utils::error::Error::Transaction(
3084                grafeo_common::utils::error::TransactionError::InvalidState(
3085                    "No active transaction".to_string(),
3086                ),
3087            )
3088        })?;
3089
3090        // Validate the transaction first (conflict detection) before committing data.
3091        // If this fails, we rollback the data changes instead of making them permanent.
3092        let touched = self.touched_graphs.lock().clone();
3093        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3094            Ok(epoch) => epoch,
3095            Err(e) => {
3096                // Conflict detected: rollback the data changes
3097                for graph_name in &touched {
3098                    let store = self.resolve_store(graph_name);
3099                    store.rollback_transaction_properties(transaction_id);
3100                }
3101                #[cfg(feature = "rdf")]
3102                self.rollback_rdf_transaction(transaction_id);
3103                *self.read_only_tx.lock() = self.db_read_only;
3104                self.savepoints.lock().clear();
3105                self.touched_graphs.lock().clear();
3106                #[cfg(feature = "metrics")]
3107                {
3108                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3109                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3110                    #[cfg(not(target_arch = "wasm32"))]
3111                    if let Some(start) = self.tx_start_time.lock().take() {
3112                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3113                        crate::metrics::record_metric!(
3114                            self.metrics,
3115                            tx_duration,
3116                            observe duration_ms
3117                        );
3118                    }
3119                }
3120                return Err(e);
3121            }
3122        };
3123
3124        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3125        for graph_name in &touched {
3126            let store = self.resolve_store(graph_name);
3127            store.finalize_version_epochs(transaction_id, commit_epoch);
3128        }
3129
3130        // Commit succeeded: discard undo logs (make changes permanent)
3131        #[cfg(feature = "rdf")]
3132        self.commit_rdf_transaction(transaction_id);
3133
3134        for graph_name in &touched {
3135            let store = self.resolve_store(graph_name);
3136            store.commit_transaction_properties(transaction_id);
3137        }
3138
3139        // Sync epoch for all touched graphs so that convenience lookups
3140        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3141        let current_epoch = self.transaction_manager.current_epoch();
3142        for graph_name in &touched {
3143            let store = self.resolve_store(graph_name);
3144            store.sync_epoch(current_epoch);
3145        }
3146
3147        // Reset read-only flag, clear savepoints and touched graphs
3148        *self.read_only_tx.lock() = self.db_read_only;
3149        self.savepoints.lock().clear();
3150        self.touched_graphs.lock().clear();
3151
3152        // Auto-GC: periodically prune old MVCC versions
3153        if self.gc_interval > 0 {
3154            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3155            if count.is_multiple_of(self.gc_interval) {
3156                let min_epoch = self.transaction_manager.min_active_epoch();
3157                for graph_name in &touched {
3158                    let store = self.resolve_store(graph_name);
3159                    store.gc_versions(min_epoch);
3160                }
3161                self.transaction_manager.gc();
3162                #[cfg(feature = "metrics")]
3163                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3164            }
3165        }
3166
3167        #[cfg(feature = "metrics")]
3168        {
3169            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3170            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3171            #[cfg(not(target_arch = "wasm32"))]
3172            if let Some(start) = self.tx_start_time.lock().take() {
3173                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3174                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3175            }
3176        }
3177
3178        Ok(())
3179    }
3180
3181    /// Aborts the current transaction.
3182    ///
3183    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3184    ///
3185    /// # Errors
3186    ///
3187    /// Returns an error if no transaction is active.
3188    ///
3189    /// # Examples
3190    ///
3191    /// ```no_run
3192    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3193    /// use grafeo_engine::GrafeoDB;
3194    ///
3195    /// let db = GrafeoDB::new_in_memory();
3196    /// let mut session = db.session();
3197    ///
3198    /// session.begin_transaction()?;
3199    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3200    /// session.rollback()?; // Insert is discarded
3201    /// # Ok(())
3202    /// # }
3203    /// ```
3204    pub fn rollback(&mut self) -> Result<()> {
3205        self.rollback_inner()
3206    }
3207
3208    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3209    fn rollback_inner(&self) -> Result<()> {
3210        let _span = tracing::debug_span!("grafeo::tx::rollback").entered();
3211        // Nested transaction: rollback to the auto-savepoint.
3212        {
3213            let mut depth = self.transaction_nesting_depth.lock();
3214            if *depth > 0 {
3215                let sp_name = format!("_nested_tx_{depth}");
3216                *depth -= 1;
3217                drop(depth);
3218                return self.rollback_to_savepoint(&sp_name);
3219            }
3220        }
3221
3222        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3223            grafeo_common::utils::error::Error::Transaction(
3224                grafeo_common::utils::error::TransactionError::InvalidState(
3225                    "No active transaction".to_string(),
3226                ),
3227            )
3228        })?;
3229
3230        // Reset read-only flag
3231        *self.read_only_tx.lock() = self.db_read_only;
3232
3233        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3234        let touched = self.touched_graphs.lock().clone();
3235        for graph_name in &touched {
3236            let store = self.resolve_store(graph_name);
3237            store.discard_uncommitted_versions(transaction_id);
3238        }
3239
3240        // Discard pending operations in the RDF store
3241        #[cfg(feature = "rdf")]
3242        self.rollback_rdf_transaction(transaction_id);
3243
3244        // Clear savepoints and touched graphs
3245        self.savepoints.lock().clear();
3246        self.touched_graphs.lock().clear();
3247
3248        // Mark transaction as aborted in the manager
3249        let result = self.transaction_manager.abort(transaction_id);
3250
3251        #[cfg(feature = "metrics")]
3252        if result.is_ok() {
3253            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3254            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3255            #[cfg(not(target_arch = "wasm32"))]
3256            if let Some(start) = self.tx_start_time.lock().take() {
3257                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3258                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3259            }
3260        }
3261
3262        result
3263    }
3264
3265    /// Creates a named savepoint within the current transaction.
3266    ///
3267    /// The savepoint captures the current node/edge ID counters so that
3268    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3269    /// entities created after this point.
3270    ///
3271    /// # Errors
3272    ///
3273    /// Returns an error if no transaction is active.
3274    pub fn savepoint(&self, name: &str) -> Result<()> {
3275        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3276            grafeo_common::utils::error::Error::Transaction(
3277                grafeo_common::utils::error::TransactionError::InvalidState(
3278                    "No active transaction".to_string(),
3279                ),
3280            )
3281        })?;
3282
3283        // Capture state for every graph touched so far.
3284        let touched = self.touched_graphs.lock().clone();
3285        let graph_snapshots: Vec<GraphSavepoint> = touched
3286            .iter()
3287            .map(|graph_name| {
3288                let store = self.resolve_store(graph_name);
3289                GraphSavepoint {
3290                    graph_name: graph_name.clone(),
3291                    next_node_id: store.peek_next_node_id(),
3292                    next_edge_id: store.peek_next_edge_id(),
3293                    undo_log_position: store.property_undo_log_position(tx_id),
3294                }
3295            })
3296            .collect();
3297
3298        self.savepoints.lock().push(SavepointState {
3299            name: name.to_string(),
3300            graph_snapshots,
3301            active_graph: self.current_graph.lock().clone(),
3302        });
3303        Ok(())
3304    }
3305
3306    /// Rolls back to a named savepoint, undoing all writes made after it.
3307    ///
3308    /// The savepoint and any savepoints created after it are removed.
3309    /// Entities with IDs >= the savepoint snapshot are discarded.
3310    ///
3311    /// # Errors
3312    ///
3313    /// Returns an error if no transaction is active or the savepoint does not exist.
3314    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3315        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3316            grafeo_common::utils::error::Error::Transaction(
3317                grafeo_common::utils::error::TransactionError::InvalidState(
3318                    "No active transaction".to_string(),
3319                ),
3320            )
3321        })?;
3322
3323        let mut savepoints = self.savepoints.lock();
3324
3325        // Find the savepoint by name (search from the end for nested savepoints)
3326        let pos = savepoints
3327            .iter()
3328            .rposition(|sp| sp.name == name)
3329            .ok_or_else(|| {
3330                grafeo_common::utils::error::Error::Transaction(
3331                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3332                        "Savepoint '{name}' not found"
3333                    )),
3334                )
3335            })?;
3336
3337        let sp_state = savepoints[pos].clone();
3338
3339        // Remove this savepoint and all later ones
3340        savepoints.truncate(pos);
3341        drop(savepoints);
3342
3343        // Roll back each graph that was captured in the savepoint.
3344        for gs in &sp_state.graph_snapshots {
3345            let store = self.resolve_store(&gs.graph_name);
3346
3347            // Replay property/label undo entries recorded after the savepoint
3348            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3349
3350            // Discard entities created after the savepoint
3351            let current_next_node = store.peek_next_node_id();
3352            let current_next_edge = store.peek_next_edge_id();
3353
3354            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3355                .map(NodeId::new)
3356                .collect();
3357            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3358                .map(EdgeId::new)
3359                .collect();
3360
3361            if !node_ids.is_empty() || !edge_ids.is_empty() {
3362                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3363            }
3364        }
3365
3366        // Also roll back any graphs that were touched AFTER the savepoint
3367        // but not captured in it. These need full discard since the savepoint
3368        // didn't include them.
3369        let touched = self.touched_graphs.lock().clone();
3370        for graph_name in &touched {
3371            let already_captured = sp_state
3372                .graph_snapshots
3373                .iter()
3374                .any(|gs| gs.graph_name == *graph_name);
3375            if !already_captured {
3376                let store = self.resolve_store(graph_name);
3377                store.discard_uncommitted_versions(transaction_id);
3378            }
3379        }
3380
3381        // Restore touched_graphs to only the graphs that were known at savepoint time.
3382        let mut touched = self.touched_graphs.lock();
3383        touched.clear();
3384        for gs in &sp_state.graph_snapshots {
3385            if !touched.contains(&gs.graph_name) {
3386                touched.push(gs.graph_name.clone());
3387            }
3388        }
3389
3390        Ok(())
3391    }
3392
3393    /// Releases (removes) a named savepoint without rolling back.
3394    ///
3395    /// # Errors
3396    ///
3397    /// Returns an error if no transaction is active or the savepoint does not exist.
3398    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3399        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3400            grafeo_common::utils::error::Error::Transaction(
3401                grafeo_common::utils::error::TransactionError::InvalidState(
3402                    "No active transaction".to_string(),
3403                ),
3404            )
3405        })?;
3406
3407        let mut savepoints = self.savepoints.lock();
3408        let pos = savepoints
3409            .iter()
3410            .rposition(|sp| sp.name == name)
3411            .ok_or_else(|| {
3412                grafeo_common::utils::error::Error::Transaction(
3413                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3414                        "Savepoint '{name}' not found"
3415                    )),
3416                )
3417            })?;
3418        savepoints.remove(pos);
3419        Ok(())
3420    }
3421
3422    /// Returns whether a transaction is active.
3423    #[must_use]
3424    pub fn in_transaction(&self) -> bool {
3425        self.current_transaction.lock().is_some()
3426    }
3427
3428    /// Returns the current transaction ID, if any.
3429    #[must_use]
3430    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3431        *self.current_transaction.lock()
3432    }
3433
3434    /// Returns a reference to the transaction manager.
3435    #[must_use]
3436    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3437        &self.transaction_manager
3438    }
3439
3440    /// Returns the store's current node count and the count at transaction start.
3441    #[must_use]
3442    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3443        (
3444            self.transaction_start_node_count.load(Ordering::Relaxed),
3445            self.active_lpg_store().node_count(),
3446        )
3447    }
3448
3449    /// Returns the store's current edge count and the count at transaction start.
3450    #[must_use]
3451    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3452        (
3453            self.transaction_start_edge_count.load(Ordering::Relaxed),
3454            self.active_lpg_store().edge_count(),
3455        )
3456    }
3457
3458    /// Prepares the current transaction for a two-phase commit.
3459    ///
3460    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3461    /// lets you inspect pending changes and attach metadata before finalizing.
3462    /// The mutable borrow prevents concurrent operations while the commit is
3463    /// pending.
3464    ///
3465    /// If the `PreparedCommit` is dropped without calling `commit()` or
3466    /// `abort()`, the transaction is automatically rolled back.
3467    ///
3468    /// # Errors
3469    ///
3470    /// Returns an error if no transaction is active.
3471    ///
3472    /// # Examples
3473    ///
3474    /// ```no_run
3475    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3476    /// use grafeo_engine::GrafeoDB;
3477    ///
3478    /// let db = GrafeoDB::new_in_memory();
3479    /// let mut session = db.session();
3480    ///
3481    /// session.begin_transaction()?;
3482    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3483    ///
3484    /// let mut prepared = session.prepare_commit()?;
3485    /// println!("Nodes written: {}", prepared.info().nodes_written);
3486    /// prepared.set_metadata("audit_user", "admin");
3487    /// prepared.commit()?;
3488    /// # Ok(())
3489    /// # }
3490    /// ```
3491    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3492        crate::transaction::PreparedCommit::new(self)
3493    }
3494
3495    /// Sets auto-commit mode.
3496    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3497        self.auto_commit = auto_commit;
3498    }
3499
3500    /// Returns whether auto-commit is enabled.
3501    #[must_use]
3502    pub fn auto_commit(&self) -> bool {
3503        self.auto_commit
3504    }
3505
3506    /// Returns `true` if auto-commit should wrap this execution.
3507    ///
3508    /// Auto-commit kicks in when: the session is in auto-commit mode,
3509    /// no explicit transaction is active, and the query mutates data.
3510    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3511        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3512    }
3513
3514    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3515    /// returns `true`. On error the transaction is rolled back.
3516    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3517    where
3518        F: FnOnce() -> Result<QueryResult>,
3519    {
3520        if self.needs_auto_commit(has_mutations) {
3521            self.begin_transaction_inner(false, None)?;
3522            match body() {
3523                Ok(result) => {
3524                    self.commit_inner()?;
3525                    Ok(result)
3526                }
3527                Err(e) => {
3528                    let _ = self.rollback_inner();
3529                    Err(e)
3530                }
3531            }
3532        } else {
3533            body()
3534        }
3535    }
3536
3537    /// Quick heuristic: returns `true` when the query text looks like it
3538    /// performs a mutation. Used by `_with_params` paths that go through the
3539    /// `QueryProcessor` (where the logical plan isn't available before
3540    /// execution). False negatives are harmless: the data just won't be
3541    /// auto-committed, which matches the prior behaviour.
3542    fn query_looks_like_mutation(query: &str) -> bool {
3543        let upper = query.to_ascii_uppercase();
3544        upper.contains("INSERT")
3545            || upper.contains("CREATE")
3546            || upper.contains("DELETE")
3547            || upper.contains("MERGE")
3548            || upper.contains("SET")
3549            || upper.contains("REMOVE")
3550            || upper.contains("DROP")
3551            || upper.contains("ALTER")
3552    }
3553
3554    /// Computes the wall-clock deadline for query execution.
3555    #[must_use]
3556    fn query_deadline(&self) -> Option<Instant> {
3557        #[cfg(not(target_arch = "wasm32"))]
3558        {
3559            self.query_timeout.map(|d| Instant::now() + d)
3560        }
3561        #[cfg(target_arch = "wasm32")]
3562        {
3563            let _ = &self.query_timeout;
3564            None
3565        }
3566    }
3567
3568    /// Records query metrics for any language.
3569    ///
3570    /// Called after query execution to update counters, latency histogram,
3571    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3572    /// where `Instant` is unavailable.
3573    #[cfg(feature = "metrics")]
3574    fn record_query_metrics(
3575        &self,
3576        language: &str,
3577        elapsed_ms: Option<f64>,
3578        result: &Result<crate::database::QueryResult>,
3579    ) {
3580        use crate::metrics::record_metric;
3581
3582        record_metric!(self.metrics, query_count, inc);
3583        if let Some(ref reg) = self.metrics {
3584            reg.query_count_by_language.increment(language);
3585        }
3586        if let Some(ms) = elapsed_ms {
3587            record_metric!(self.metrics, query_latency, observe ms);
3588        }
3589        match result {
3590            Ok(r) => {
3591                let returned = r.rows.len() as u64;
3592                record_metric!(self.metrics, rows_returned, add returned);
3593                if let Some(scanned) = r.rows_scanned {
3594                    record_metric!(self.metrics, rows_scanned, add scanned);
3595                }
3596            }
3597            Err(e) => {
3598                record_metric!(self.metrics, query_errors, inc);
3599                // Detect timeout errors
3600                let msg = e.to_string();
3601                if msg.contains("exceeded timeout") {
3602                    record_metric!(self.metrics, query_timeouts, inc);
3603                }
3604            }
3605        }
3606    }
3607
3608    /// Evaluates a simple integer literal from a session parameter expression.
3609    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3610        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3611        match expr {
3612            Expression::Literal(Literal::Integer(n)) => Some(*n),
3613            _ => None,
3614        }
3615    }
3616
3617    /// Returns the current transaction context for MVCC visibility.
3618    ///
3619    /// Returns `(viewing_epoch, transaction_id)` where:
3620    /// - `viewing_epoch` is the epoch at which to check version visibility
3621    /// - `transaction_id` is the current transaction ID (if in a transaction)
3622    #[must_use]
3623    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3624        // Time-travel override takes precedence (read-only, no tx context)
3625        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3626            return (epoch, None);
3627        }
3628
3629        if let Some(transaction_id) = *self.current_transaction.lock() {
3630            // In a transaction: use the transaction's start epoch
3631            let epoch = self
3632                .transaction_manager
3633                .start_epoch(transaction_id)
3634                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3635            (epoch, Some(transaction_id))
3636        } else {
3637            // No transaction: use current epoch
3638            (self.transaction_manager.current_epoch(), None)
3639        }
3640    }
3641
3642    /// Creates a planner with transaction context and constraint validator.
3643    ///
3644    /// The `store` parameter is the graph store to plan against (use
3645    /// `self.active_store()` for graph-aware execution).
3646    fn create_planner_for_store(
3647        &self,
3648        store: Arc<dyn GraphStoreMut>,
3649        viewing_epoch: EpochId,
3650        transaction_id: Option<TransactionId>,
3651    ) -> crate::query::Planner {
3652        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3653    }
3654
3655    fn create_planner_for_store_with_read_only(
3656        &self,
3657        store: Arc<dyn GraphStoreMut>,
3658        viewing_epoch: EpochId,
3659        transaction_id: Option<TransactionId>,
3660        read_only: bool,
3661    ) -> crate::query::Planner {
3662        use crate::query::Planner;
3663        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3664
3665        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3666        let info_store = Arc::clone(&store);
3667        let schema_store = Arc::clone(&store);
3668
3669        let session_context = SessionContext {
3670            current_schema: self.current_schema(),
3671            current_graph: self.current_graph(),
3672            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3673            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3674        };
3675
3676        let mut planner = Planner::with_context(
3677            Arc::clone(&store),
3678            Arc::clone(&self.transaction_manager),
3679            transaction_id,
3680            viewing_epoch,
3681        )
3682        .with_factorized_execution(self.factorized_execution)
3683        .with_catalog(Arc::clone(&self.catalog))
3684        .with_session_context(session_context)
3685        .with_read_only(read_only);
3686
3687        // Attach the constraint validator for schema enforcement
3688        let validator =
3689            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3690        planner = planner.with_validator(Arc::new(validator));
3691
3692        planner
3693    }
3694
3695    /// Builds a `Value::Map` for the `info()` introspection function.
3696    fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3697        use grafeo_common::types::PropertyKey;
3698        use std::collections::BTreeMap;
3699
3700        let mut map = BTreeMap::new();
3701        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3702        map.insert(
3703            PropertyKey::from("node_count"),
3704            Value::Int64(store.node_count() as i64),
3705        );
3706        map.insert(
3707            PropertyKey::from("edge_count"),
3708            Value::Int64(store.edge_count() as i64),
3709        );
3710        map.insert(
3711            PropertyKey::from("version"),
3712            Value::String(env!("CARGO_PKG_VERSION").into()),
3713        );
3714        Value::Map(map.into())
3715    }
3716
3717    /// Builds a `Value::Map` for the `schema()` introspection function.
3718    fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3719        use grafeo_common::types::PropertyKey;
3720        use std::collections::BTreeMap;
3721
3722        let labels: Vec<Value> = store
3723            .all_labels()
3724            .into_iter()
3725            .map(|l| Value::String(l.into()))
3726            .collect();
3727        let edge_types: Vec<Value> = store
3728            .all_edge_types()
3729            .into_iter()
3730            .map(|t| Value::String(t.into()))
3731            .collect();
3732        let property_keys: Vec<Value> = store
3733            .all_property_keys()
3734            .into_iter()
3735            .map(|k| Value::String(k.into()))
3736            .collect();
3737
3738        let mut map = BTreeMap::new();
3739        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3740        map.insert(
3741            PropertyKey::from("edge_types"),
3742            Value::List(edge_types.into()),
3743        );
3744        map.insert(
3745            PropertyKey::from("property_keys"),
3746            Value::List(property_keys.into()),
3747        );
3748        Value::Map(map.into())
3749    }
3750
3751    /// Creates a node directly (bypassing query execution).
3752    ///
3753    /// This is a low-level API for testing and direct manipulation.
3754    /// If a transaction is active, the node will be versioned with the transaction ID.
3755    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3756        let (epoch, transaction_id) = self.get_transaction_context();
3757        self.active_lpg_store().create_node_versioned(
3758            labels,
3759            epoch,
3760            transaction_id.unwrap_or(TransactionId::SYSTEM),
3761        )
3762    }
3763
3764    /// Creates a node with properties.
3765    ///
3766    /// If a transaction is active, the node will be versioned with the transaction ID.
3767    pub fn create_node_with_props<'a>(
3768        &self,
3769        labels: &[&str],
3770        properties: impl IntoIterator<Item = (&'a str, Value)>,
3771    ) -> NodeId {
3772        let (epoch, transaction_id) = self.get_transaction_context();
3773        self.active_lpg_store().create_node_with_props_versioned(
3774            labels,
3775            properties,
3776            epoch,
3777            transaction_id.unwrap_or(TransactionId::SYSTEM),
3778        )
3779    }
3780
3781    /// Creates an edge between two nodes.
3782    ///
3783    /// This is a low-level API for testing and direct manipulation.
3784    /// If a transaction is active, the edge will be versioned with the transaction ID.
3785    pub fn create_edge(
3786        &self,
3787        src: NodeId,
3788        dst: NodeId,
3789        edge_type: &str,
3790    ) -> grafeo_common::types::EdgeId {
3791        let (epoch, transaction_id) = self.get_transaction_context();
3792        self.active_lpg_store().create_edge_versioned(
3793            src,
3794            dst,
3795            edge_type,
3796            epoch,
3797            transaction_id.unwrap_or(TransactionId::SYSTEM),
3798        )
3799    }
3800
3801    // =========================================================================
3802    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3803    // =========================================================================
3804
3805    /// Gets a node by ID directly, bypassing query planning.
3806    ///
3807    /// This is the fastest way to retrieve a single node when you know its ID.
3808    /// Skips parsing, binding, optimization, and physical planning entirely.
3809    ///
3810    /// # Performance
3811    ///
3812    /// - Time complexity: O(1) average case
3813    /// - No lock contention (uses DashMap internally)
3814    /// - ~20-30x faster than equivalent MATCH query
3815    ///
3816    /// # Example
3817    ///
3818    /// ```no_run
3819    /// # use grafeo_engine::GrafeoDB;
3820    /// # let db = GrafeoDB::new_in_memory();
3821    /// let session = db.session();
3822    /// let node_id = session.create_node(&["Person"]);
3823    ///
3824    /// // Direct lookup - O(1), no query planning
3825    /// let node = session.get_node(node_id);
3826    /// assert!(node.is_some());
3827    /// ```
3828    #[must_use]
3829    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3830        let (epoch, transaction_id) = self.get_transaction_context();
3831        self.active_lpg_store().get_node_versioned(
3832            id,
3833            epoch,
3834            transaction_id.unwrap_or(TransactionId::SYSTEM),
3835        )
3836    }
3837
3838    /// Gets a single property from a node by ID, bypassing query planning.
3839    ///
3840    /// More efficient than `get_node()` when you only need one property,
3841    /// as it avoids loading the full node with all properties.
3842    ///
3843    /// # Performance
3844    ///
3845    /// - Time complexity: O(1) average case
3846    /// - No query planning overhead
3847    ///
3848    /// # Example
3849    ///
3850    /// ```no_run
3851    /// # use grafeo_engine::GrafeoDB;
3852    /// # use grafeo_common::types::Value;
3853    /// # let db = GrafeoDB::new_in_memory();
3854    /// let session = db.session();
3855    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3856    ///
3857    /// // Direct property access - O(1)
3858    /// let name = session.get_node_property(id, "name");
3859    /// assert_eq!(name, Some(Value::String("Alix".into())));
3860    /// ```
3861    #[must_use]
3862    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3863        self.get_node(id)
3864            .and_then(|node| node.get_property(key).cloned())
3865    }
3866
3867    /// Gets an edge by ID directly, bypassing query planning.
3868    ///
3869    /// # Performance
3870    ///
3871    /// - Time complexity: O(1) average case
3872    /// - No lock contention
3873    #[must_use]
3874    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3875        let (epoch, transaction_id) = self.get_transaction_context();
3876        self.active_lpg_store().get_edge_versioned(
3877            id,
3878            epoch,
3879            transaction_id.unwrap_or(TransactionId::SYSTEM),
3880        )
3881    }
3882
3883    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3884    ///
3885    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3886    ///
3887    /// # Performance
3888    ///
3889    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3890    /// - Uses adjacency index for direct access
3891    /// - ~10-20x faster than equivalent MATCH query
3892    ///
3893    /// # Example
3894    ///
3895    /// ```no_run
3896    /// # use grafeo_engine::GrafeoDB;
3897    /// # let db = GrafeoDB::new_in_memory();
3898    /// let session = db.session();
3899    /// let alix = session.create_node(&["Person"]);
3900    /// let gus = session.create_node(&["Person"]);
3901    /// session.create_edge(alix, gus, "KNOWS");
3902    ///
3903    /// // Direct neighbor lookup - O(degree)
3904    /// let neighbors = session.get_neighbors_outgoing(alix);
3905    /// assert_eq!(neighbors.len(), 1);
3906    /// assert_eq!(neighbors[0].0, gus);
3907    /// ```
3908    #[must_use]
3909    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3910        self.active_lpg_store()
3911            .edges_from(node, Direction::Outgoing)
3912            .collect()
3913    }
3914
3915    /// Gets incoming neighbors of a node directly, bypassing query planning.
3916    ///
3917    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3918    ///
3919    /// # Performance
3920    ///
3921    /// - Time complexity: O(degree) where degree is the number of incoming edges
3922    /// - Uses backward adjacency index for direct access
3923    #[must_use]
3924    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3925        self.active_lpg_store()
3926            .edges_from(node, Direction::Incoming)
3927            .collect()
3928    }
3929
3930    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3931    ///
3932    /// # Example
3933    ///
3934    /// ```no_run
3935    /// # use grafeo_engine::GrafeoDB;
3936    /// # let db = GrafeoDB::new_in_memory();
3937    /// # let session = db.session();
3938    /// # let alix = session.create_node(&["Person"]);
3939    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3940    /// ```
3941    #[must_use]
3942    pub fn get_neighbors_outgoing_by_type(
3943        &self,
3944        node: NodeId,
3945        edge_type: &str,
3946    ) -> Vec<(NodeId, EdgeId)> {
3947        self.active_lpg_store()
3948            .edges_from(node, Direction::Outgoing)
3949            .filter(|(_, edge_id)| {
3950                self.get_edge(*edge_id)
3951                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3952            })
3953            .collect()
3954    }
3955
3956    /// Checks if a node exists, bypassing query planning.
3957    ///
3958    /// # Performance
3959    ///
3960    /// - Time complexity: O(1)
3961    /// - Fastest existence check available
3962    #[must_use]
3963    pub fn node_exists(&self, id: NodeId) -> bool {
3964        self.get_node(id).is_some()
3965    }
3966
3967    /// Checks if an edge exists, bypassing query planning.
3968    #[must_use]
3969    pub fn edge_exists(&self, id: EdgeId) -> bool {
3970        self.get_edge(id).is_some()
3971    }
3972
3973    /// Gets the degree (number of edges) of a node.
3974    ///
3975    /// Returns (outgoing_degree, incoming_degree).
3976    #[must_use]
3977    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3978        let active = self.active_lpg_store();
3979        let out = active.out_degree(node);
3980        let in_degree = active.in_degree(node);
3981        (out, in_degree)
3982    }
3983
3984    /// Batch lookup of multiple nodes by ID.
3985    ///
3986    /// More efficient than calling `get_node()` in a loop because it
3987    /// amortizes overhead.
3988    ///
3989    /// # Performance
3990    ///
3991    /// - Time complexity: O(n) where n is the number of IDs
3992    /// - Better cache utilization than individual lookups
3993    #[must_use]
3994    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3995        let (epoch, transaction_id) = self.get_transaction_context();
3996        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3997        let active = self.active_lpg_store();
3998        ids.iter()
3999            .map(|&id| active.get_node_versioned(id, epoch, tx))
4000            .collect()
4001    }
4002
4003    // ── Change Data Capture ─────────────────────────────────────────────
4004
4005    /// Returns the full change history for an entity (node or edge).
4006    #[cfg(feature = "cdc")]
4007    pub fn history(
4008        &self,
4009        entity_id: impl Into<crate::cdc::EntityId>,
4010    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4011        Ok(self.cdc_log.history(entity_id.into()))
4012    }
4013
4014    /// Returns change events for an entity since the given epoch.
4015    #[cfg(feature = "cdc")]
4016    pub fn history_since(
4017        &self,
4018        entity_id: impl Into<crate::cdc::EntityId>,
4019        since_epoch: EpochId,
4020    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4021        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4022    }
4023
4024    /// Returns all change events across all entities in an epoch range.
4025    #[cfg(feature = "cdc")]
4026    pub fn changes_between(
4027        &self,
4028        start_epoch: EpochId,
4029        end_epoch: EpochId,
4030    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4031        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4032    }
4033}
4034
4035impl Drop for Session {
4036    fn drop(&mut self) {
4037        // Auto-rollback any active transaction to prevent leaked MVCC state,
4038        // dangling write locks, and uncommitted versions lingering in the store.
4039        if self.in_transaction() {
4040            let _ = self.rollback_inner();
4041        }
4042
4043        #[cfg(feature = "metrics")]
4044        if let Some(ref reg) = self.metrics {
4045            reg.session_active
4046                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4047        }
4048    }
4049}
4050
4051#[cfg(test)]
4052mod tests {
4053    use super::parse_default_literal;
4054    use crate::database::GrafeoDB;
4055    use grafeo_common::types::Value;
4056
4057    // -----------------------------------------------------------------------
4058    // parse_default_literal
4059    // -----------------------------------------------------------------------
4060
4061    #[test]
4062    fn parse_default_literal_null() {
4063        assert_eq!(parse_default_literal("null"), Value::Null);
4064        assert_eq!(parse_default_literal("NULL"), Value::Null);
4065        assert_eq!(parse_default_literal("Null"), Value::Null);
4066    }
4067
4068    #[test]
4069    fn parse_default_literal_bool() {
4070        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4071        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4072        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4073        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4074    }
4075
4076    #[test]
4077    fn parse_default_literal_string_single_quoted() {
4078        assert_eq!(
4079            parse_default_literal("'hello'"),
4080            Value::String("hello".into())
4081        );
4082    }
4083
4084    #[test]
4085    fn parse_default_literal_string_double_quoted() {
4086        assert_eq!(
4087            parse_default_literal("\"world\""),
4088            Value::String("world".into())
4089        );
4090    }
4091
4092    #[test]
4093    fn parse_default_literal_integer() {
4094        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4095        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4096        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4097    }
4098
4099    #[test]
4100    fn parse_default_literal_float() {
4101        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4102        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4103    }
4104
4105    #[test]
4106    fn parse_default_literal_fallback_string() {
4107        // Not a recognized literal, not quoted, not a number
4108        assert_eq!(
4109            parse_default_literal("some_identifier"),
4110            Value::String("some_identifier".into())
4111        );
4112    }
4113
4114    #[test]
4115    fn test_session_create_node() {
4116        let db = GrafeoDB::new_in_memory();
4117        let session = db.session();
4118
4119        let id = session.create_node(&["Person"]);
4120        assert!(id.is_valid());
4121        assert_eq!(db.node_count(), 1);
4122    }
4123
4124    #[test]
4125    fn test_session_transaction() {
4126        let db = GrafeoDB::new_in_memory();
4127        let mut session = db.session();
4128
4129        assert!(!session.in_transaction());
4130
4131        session.begin_transaction().unwrap();
4132        assert!(session.in_transaction());
4133
4134        session.commit().unwrap();
4135        assert!(!session.in_transaction());
4136    }
4137
4138    #[test]
4139    fn test_session_transaction_context() {
4140        let db = GrafeoDB::new_in_memory();
4141        let mut session = db.session();
4142
4143        // Without transaction - context should have current epoch and no transaction_id
4144        let (_epoch1, transaction_id1) = session.get_transaction_context();
4145        assert!(transaction_id1.is_none());
4146
4147        // Start a transaction
4148        session.begin_transaction().unwrap();
4149        let (epoch2, transaction_id2) = session.get_transaction_context();
4150        assert!(transaction_id2.is_some());
4151        // Transaction should have a valid epoch
4152        let _ = epoch2; // Use the variable
4153
4154        // Commit and verify
4155        session.commit().unwrap();
4156        let (epoch3, tx_id3) = session.get_transaction_context();
4157        assert!(tx_id3.is_none());
4158        // Epoch should have advanced after commit
4159        assert!(epoch3.as_u64() >= epoch2.as_u64());
4160    }
4161
4162    #[test]
4163    fn test_session_rollback() {
4164        let db = GrafeoDB::new_in_memory();
4165        let mut session = db.session();
4166
4167        session.begin_transaction().unwrap();
4168        session.rollback().unwrap();
4169        assert!(!session.in_transaction());
4170    }
4171
4172    #[test]
4173    fn test_session_rollback_discards_versions() {
4174        use grafeo_common::types::TransactionId;
4175
4176        let db = GrafeoDB::new_in_memory();
4177
4178        // Create a node outside of any transaction (at system level)
4179        let node_before = db.store().create_node(&["Person"]);
4180        assert!(node_before.is_valid());
4181        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4182
4183        // Start a transaction
4184        let mut session = db.session();
4185        session.begin_transaction().unwrap();
4186        let transaction_id = session.current_transaction.lock().unwrap();
4187
4188        // Create a node versioned with the transaction's ID
4189        let epoch = db.store().current_epoch();
4190        let node_in_tx = db
4191            .store()
4192            .create_node_versioned(&["Person"], epoch, transaction_id);
4193        assert!(node_in_tx.is_valid());
4194
4195        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4196        // non-versioned lookups like node_count(). Verify the node is visible
4197        // only through the owning transaction.
4198        assert_eq!(
4199            db.node_count(),
4200            1,
4201            "PENDING nodes should be invisible to non-versioned node_count()"
4202        );
4203        assert!(
4204            db.store()
4205                .get_node_versioned(node_in_tx, epoch, transaction_id)
4206                .is_some(),
4207            "Transaction node should be visible to its own transaction"
4208        );
4209
4210        // Rollback the transaction
4211        session.rollback().unwrap();
4212        assert!(!session.in_transaction());
4213
4214        // The node created in the transaction should be discarded
4215        // Only the first node should remain visible
4216        let count_after = db.node_count();
4217        assert_eq!(
4218            count_after, 1,
4219            "Rollback should discard uncommitted node, but got {count_after}"
4220        );
4221
4222        // The original node should still be accessible
4223        let current_epoch = db.store().current_epoch();
4224        assert!(
4225            db.store()
4226                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4227                .is_some(),
4228            "Original node should still exist"
4229        );
4230
4231        // The node created in the transaction should not be accessible
4232        assert!(
4233            db.store()
4234                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4235                .is_none(),
4236            "Transaction node should be gone"
4237        );
4238    }
4239
4240    #[test]
4241    fn test_session_create_node_in_transaction() {
4242        // Test that session.create_node() is transaction-aware
4243        let db = GrafeoDB::new_in_memory();
4244
4245        // Create a node outside of any transaction
4246        let node_before = db.create_node(&["Person"]);
4247        assert!(node_before.is_valid());
4248        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4249
4250        // Start a transaction and create a node through the session
4251        let mut session = db.session();
4252        session.begin_transaction().unwrap();
4253        let transaction_id = session.current_transaction.lock().unwrap();
4254
4255        // Create a node through session.create_node() - should be versioned with tx
4256        let node_in_tx = session.create_node(&["Person"]);
4257        assert!(node_in_tx.is_valid());
4258
4259        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4260        // non-versioned lookups. Verify the node is visible only to its own tx.
4261        assert_eq!(
4262            db.node_count(),
4263            1,
4264            "PENDING nodes should be invisible to non-versioned node_count()"
4265        );
4266        let epoch = db.store().current_epoch();
4267        assert!(
4268            db.store()
4269                .get_node_versioned(node_in_tx, epoch, transaction_id)
4270                .is_some(),
4271            "Transaction node should be visible to its own transaction"
4272        );
4273
4274        // Rollback the transaction
4275        session.rollback().unwrap();
4276
4277        // The node created via session.create_node() should be discarded
4278        let count_after = db.node_count();
4279        assert_eq!(
4280            count_after, 1,
4281            "Rollback should discard node created via session.create_node(), but got {count_after}"
4282        );
4283    }
4284
4285    #[test]
4286    fn test_session_create_node_with_props_in_transaction() {
4287        use grafeo_common::types::Value;
4288
4289        // Test that session.create_node_with_props() is transaction-aware
4290        let db = GrafeoDB::new_in_memory();
4291
4292        // Create a node outside of any transaction
4293        db.create_node(&["Person"]);
4294        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4295
4296        // Start a transaction and create a node with properties
4297        let mut session = db.session();
4298        session.begin_transaction().unwrap();
4299        let transaction_id = session.current_transaction.lock().unwrap();
4300
4301        let node_in_tx =
4302            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4303        assert!(node_in_tx.is_valid());
4304
4305        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4306        // non-versioned lookups. Verify the node is visible only to its own tx.
4307        assert_eq!(
4308            db.node_count(),
4309            1,
4310            "PENDING nodes should be invisible to non-versioned node_count()"
4311        );
4312        let epoch = db.store().current_epoch();
4313        assert!(
4314            db.store()
4315                .get_node_versioned(node_in_tx, epoch, transaction_id)
4316                .is_some(),
4317            "Transaction node should be visible to its own transaction"
4318        );
4319
4320        // Rollback the transaction
4321        session.rollback().unwrap();
4322
4323        // The node should be discarded
4324        let count_after = db.node_count();
4325        assert_eq!(
4326            count_after, 1,
4327            "Rollback should discard node created via session.create_node_with_props()"
4328        );
4329    }
4330
4331    #[cfg(feature = "gql")]
4332    mod gql_tests {
4333        use super::*;
4334
4335        #[test]
4336        fn test_gql_query_execution() {
4337            let db = GrafeoDB::new_in_memory();
4338            let session = db.session();
4339
4340            // Create some test data
4341            session.create_node(&["Person"]);
4342            session.create_node(&["Person"]);
4343            session.create_node(&["Animal"]);
4344
4345            // Execute a GQL query
4346            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4347
4348            // Should return 2 Person nodes
4349            assert_eq!(result.row_count(), 2);
4350            assert_eq!(result.column_count(), 1);
4351            assert_eq!(result.columns[0], "n");
4352        }
4353
4354        #[test]
4355        fn test_gql_empty_result() {
4356            let db = GrafeoDB::new_in_memory();
4357            let session = db.session();
4358
4359            // No data in database
4360            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4361
4362            assert_eq!(result.row_count(), 0);
4363        }
4364
4365        #[test]
4366        fn test_gql_parse_error() {
4367            let db = GrafeoDB::new_in_memory();
4368            let session = db.session();
4369
4370            // Invalid GQL syntax
4371            let result = session.execute("MATCH (n RETURN n");
4372
4373            assert!(result.is_err());
4374        }
4375
4376        #[test]
4377        fn test_gql_relationship_traversal() {
4378            let db = GrafeoDB::new_in_memory();
4379            let session = db.session();
4380
4381            // Create a graph: Alix -> Gus, Alix -> Vincent
4382            let alix = session.create_node(&["Person"]);
4383            let gus = session.create_node(&["Person"]);
4384            let vincent = session.create_node(&["Person"]);
4385
4386            session.create_edge(alix, gus, "KNOWS");
4387            session.create_edge(alix, vincent, "KNOWS");
4388
4389            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4390            let result = session
4391                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4392                .unwrap();
4393
4394            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4395            assert_eq!(result.row_count(), 2);
4396            assert_eq!(result.column_count(), 2);
4397            assert_eq!(result.columns[0], "a");
4398            assert_eq!(result.columns[1], "b");
4399        }
4400
4401        #[test]
4402        fn test_gql_relationship_with_type_filter() {
4403            let db = GrafeoDB::new_in_memory();
4404            let session = db.session();
4405
4406            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4407            let alix = session.create_node(&["Person"]);
4408            let gus = session.create_node(&["Person"]);
4409            let vincent = session.create_node(&["Person"]);
4410
4411            session.create_edge(alix, gus, "KNOWS");
4412            session.create_edge(alix, vincent, "WORKS_WITH");
4413
4414            // Query only KNOWS relationships
4415            let result = session
4416                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4417                .unwrap();
4418
4419            // Should return only 1 row (Alix->Gus)
4420            assert_eq!(result.row_count(), 1);
4421        }
4422
4423        #[test]
4424        fn test_gql_semantic_error_undefined_variable() {
4425            let db = GrafeoDB::new_in_memory();
4426            let session = db.session();
4427
4428            // Reference undefined variable 'x' in RETURN
4429            let result = session.execute("MATCH (n:Person) RETURN x");
4430
4431            // Should fail with semantic error
4432            assert!(result.is_err());
4433            let Err(err) = result else {
4434                panic!("Expected error")
4435            };
4436            assert!(
4437                err.to_string().contains("Undefined variable"),
4438                "Expected undefined variable error, got: {}",
4439                err
4440            );
4441        }
4442
4443        #[test]
4444        fn test_gql_where_clause_property_filter() {
4445            use grafeo_common::types::Value;
4446
4447            let db = GrafeoDB::new_in_memory();
4448            let session = db.session();
4449
4450            // Create people with ages
4451            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4452            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4453            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4454
4455            // Query with WHERE clause: age > 30
4456            let result = session
4457                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4458                .unwrap();
4459
4460            // Should return 2 people (ages 35 and 45)
4461            assert_eq!(result.row_count(), 2);
4462        }
4463
4464        #[test]
4465        fn test_gql_where_clause_equality() {
4466            use grafeo_common::types::Value;
4467
4468            let db = GrafeoDB::new_in_memory();
4469            let session = db.session();
4470
4471            // Create people with names
4472            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4473            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4474            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4475
4476            // Query with WHERE clause: name = "Alix"
4477            let result = session
4478                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4479                .unwrap();
4480
4481            // Should return 2 people named Alix
4482            assert_eq!(result.row_count(), 2);
4483        }
4484
4485        #[test]
4486        fn test_gql_return_property_access() {
4487            use grafeo_common::types::Value;
4488
4489            let db = GrafeoDB::new_in_memory();
4490            let session = db.session();
4491
4492            // Create people with names and ages
4493            session.create_node_with_props(
4494                &["Person"],
4495                [
4496                    ("name", Value::String("Alix".into())),
4497                    ("age", Value::Int64(30)),
4498                ],
4499            );
4500            session.create_node_with_props(
4501                &["Person"],
4502                [
4503                    ("name", Value::String("Gus".into())),
4504                    ("age", Value::Int64(25)),
4505                ],
4506            );
4507
4508            // Query returning properties
4509            let result = session
4510                .execute("MATCH (n:Person) RETURN n.name, n.age")
4511                .unwrap();
4512
4513            // Should return 2 rows with name and age columns
4514            assert_eq!(result.row_count(), 2);
4515            assert_eq!(result.column_count(), 2);
4516            assert_eq!(result.columns[0], "n.name");
4517            assert_eq!(result.columns[1], "n.age");
4518
4519            // Check that we get actual values
4520            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4521            assert!(names.contains(&&Value::String("Alix".into())));
4522            assert!(names.contains(&&Value::String("Gus".into())));
4523        }
4524
4525        #[test]
4526        fn test_gql_return_mixed_expressions() {
4527            use grafeo_common::types::Value;
4528
4529            let db = GrafeoDB::new_in_memory();
4530            let session = db.session();
4531
4532            // Create a person
4533            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4534
4535            // Query returning both node and property
4536            let result = session
4537                .execute("MATCH (n:Person) RETURN n, n.name")
4538                .unwrap();
4539
4540            assert_eq!(result.row_count(), 1);
4541            assert_eq!(result.column_count(), 2);
4542            assert_eq!(result.columns[0], "n");
4543            assert_eq!(result.columns[1], "n.name");
4544
4545            // Second column should be the name
4546            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4547        }
4548    }
4549
4550    #[cfg(feature = "cypher")]
4551    mod cypher_tests {
4552        use super::*;
4553
4554        #[test]
4555        fn test_cypher_query_execution() {
4556            let db = GrafeoDB::new_in_memory();
4557            let session = db.session();
4558
4559            // Create some test data
4560            session.create_node(&["Person"]);
4561            session.create_node(&["Person"]);
4562            session.create_node(&["Animal"]);
4563
4564            // Execute a Cypher query
4565            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4566
4567            // Should return 2 Person nodes
4568            assert_eq!(result.row_count(), 2);
4569            assert_eq!(result.column_count(), 1);
4570            assert_eq!(result.columns[0], "n");
4571        }
4572
4573        #[test]
4574        fn test_cypher_empty_result() {
4575            let db = GrafeoDB::new_in_memory();
4576            let session = db.session();
4577
4578            // No data in database
4579            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4580
4581            assert_eq!(result.row_count(), 0);
4582        }
4583
4584        #[test]
4585        fn test_cypher_parse_error() {
4586            let db = GrafeoDB::new_in_memory();
4587            let session = db.session();
4588
4589            // Invalid Cypher syntax
4590            let result = session.execute_cypher("MATCH (n RETURN n");
4591
4592            assert!(result.is_err());
4593        }
4594    }
4595
4596    // ==================== Direct Lookup API Tests ====================
4597
4598    mod direct_lookup_tests {
4599        use super::*;
4600        use grafeo_common::types::Value;
4601
4602        #[test]
4603        fn test_get_node() {
4604            let db = GrafeoDB::new_in_memory();
4605            let session = db.session();
4606
4607            let id = session.create_node(&["Person"]);
4608            let node = session.get_node(id);
4609
4610            assert!(node.is_some());
4611            let node = node.unwrap();
4612            assert_eq!(node.id, id);
4613        }
4614
4615        #[test]
4616        fn test_get_node_not_found() {
4617            use grafeo_common::types::NodeId;
4618
4619            let db = GrafeoDB::new_in_memory();
4620            let session = db.session();
4621
4622            // Try to get a non-existent node
4623            let node = session.get_node(NodeId::new(9999));
4624            assert!(node.is_none());
4625        }
4626
4627        #[test]
4628        fn test_get_node_property() {
4629            let db = GrafeoDB::new_in_memory();
4630            let session = db.session();
4631
4632            let id = session
4633                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4634
4635            let name = session.get_node_property(id, "name");
4636            assert_eq!(name, Some(Value::String("Alix".into())));
4637
4638            // Non-existent property
4639            let missing = session.get_node_property(id, "missing");
4640            assert!(missing.is_none());
4641        }
4642
4643        #[test]
4644        fn test_get_edge() {
4645            let db = GrafeoDB::new_in_memory();
4646            let session = db.session();
4647
4648            let alix = session.create_node(&["Person"]);
4649            let gus = session.create_node(&["Person"]);
4650            let edge_id = session.create_edge(alix, gus, "KNOWS");
4651
4652            let edge = session.get_edge(edge_id);
4653            assert!(edge.is_some());
4654            let edge = edge.unwrap();
4655            assert_eq!(edge.id, edge_id);
4656            assert_eq!(edge.src, alix);
4657            assert_eq!(edge.dst, gus);
4658        }
4659
4660        #[test]
4661        fn test_get_edge_not_found() {
4662            use grafeo_common::types::EdgeId;
4663
4664            let db = GrafeoDB::new_in_memory();
4665            let session = db.session();
4666
4667            let edge = session.get_edge(EdgeId::new(9999));
4668            assert!(edge.is_none());
4669        }
4670
4671        #[test]
4672        fn test_get_neighbors_outgoing() {
4673            let db = GrafeoDB::new_in_memory();
4674            let session = db.session();
4675
4676            let alix = session.create_node(&["Person"]);
4677            let gus = session.create_node(&["Person"]);
4678            let harm = session.create_node(&["Person"]);
4679
4680            session.create_edge(alix, gus, "KNOWS");
4681            session.create_edge(alix, harm, "KNOWS");
4682
4683            let neighbors = session.get_neighbors_outgoing(alix);
4684            assert_eq!(neighbors.len(), 2);
4685
4686            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4687            assert!(neighbor_ids.contains(&gus));
4688            assert!(neighbor_ids.contains(&harm));
4689        }
4690
4691        #[test]
4692        fn test_get_neighbors_incoming() {
4693            let db = GrafeoDB::new_in_memory();
4694            let session = db.session();
4695
4696            let alix = session.create_node(&["Person"]);
4697            let gus = session.create_node(&["Person"]);
4698            let harm = session.create_node(&["Person"]);
4699
4700            session.create_edge(gus, alix, "KNOWS");
4701            session.create_edge(harm, alix, "KNOWS");
4702
4703            let neighbors = session.get_neighbors_incoming(alix);
4704            assert_eq!(neighbors.len(), 2);
4705
4706            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4707            assert!(neighbor_ids.contains(&gus));
4708            assert!(neighbor_ids.contains(&harm));
4709        }
4710
4711        #[test]
4712        fn test_get_neighbors_outgoing_by_type() {
4713            let db = GrafeoDB::new_in_memory();
4714            let session = db.session();
4715
4716            let alix = session.create_node(&["Person"]);
4717            let gus = session.create_node(&["Person"]);
4718            let company = session.create_node(&["Company"]);
4719
4720            session.create_edge(alix, gus, "KNOWS");
4721            session.create_edge(alix, company, "WORKS_AT");
4722
4723            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4724            assert_eq!(knows_neighbors.len(), 1);
4725            assert_eq!(knows_neighbors[0].0, gus);
4726
4727            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4728            assert_eq!(works_neighbors.len(), 1);
4729            assert_eq!(works_neighbors[0].0, company);
4730
4731            // No edges of this type
4732            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4733            assert!(no_neighbors.is_empty());
4734        }
4735
4736        #[test]
4737        fn test_node_exists() {
4738            use grafeo_common::types::NodeId;
4739
4740            let db = GrafeoDB::new_in_memory();
4741            let session = db.session();
4742
4743            let id = session.create_node(&["Person"]);
4744
4745            assert!(session.node_exists(id));
4746            assert!(!session.node_exists(NodeId::new(9999)));
4747        }
4748
4749        #[test]
4750        fn test_edge_exists() {
4751            use grafeo_common::types::EdgeId;
4752
4753            let db = GrafeoDB::new_in_memory();
4754            let session = db.session();
4755
4756            let alix = session.create_node(&["Person"]);
4757            let gus = session.create_node(&["Person"]);
4758            let edge_id = session.create_edge(alix, gus, "KNOWS");
4759
4760            assert!(session.edge_exists(edge_id));
4761            assert!(!session.edge_exists(EdgeId::new(9999)));
4762        }
4763
4764        #[test]
4765        fn test_get_degree() {
4766            let db = GrafeoDB::new_in_memory();
4767            let session = db.session();
4768
4769            let alix = session.create_node(&["Person"]);
4770            let gus = session.create_node(&["Person"]);
4771            let harm = session.create_node(&["Person"]);
4772
4773            // Alix knows Gus and Harm (2 outgoing)
4774            session.create_edge(alix, gus, "KNOWS");
4775            session.create_edge(alix, harm, "KNOWS");
4776            // Gus knows Alix (1 incoming for Alix)
4777            session.create_edge(gus, alix, "KNOWS");
4778
4779            let (out_degree, in_degree) = session.get_degree(alix);
4780            assert_eq!(out_degree, 2);
4781            assert_eq!(in_degree, 1);
4782
4783            // Node with no edges
4784            let lonely = session.create_node(&["Person"]);
4785            let (out, in_deg) = session.get_degree(lonely);
4786            assert_eq!(out, 0);
4787            assert_eq!(in_deg, 0);
4788        }
4789
4790        #[test]
4791        fn test_get_nodes_batch() {
4792            let db = GrafeoDB::new_in_memory();
4793            let session = db.session();
4794
4795            let alix = session.create_node(&["Person"]);
4796            let gus = session.create_node(&["Person"]);
4797            let harm = session.create_node(&["Person"]);
4798
4799            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4800            assert_eq!(nodes.len(), 3);
4801            assert!(nodes[0].is_some());
4802            assert!(nodes[1].is_some());
4803            assert!(nodes[2].is_some());
4804
4805            // With non-existent node
4806            use grafeo_common::types::NodeId;
4807            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4808            assert_eq!(nodes_with_missing.len(), 3);
4809            assert!(nodes_with_missing[0].is_some());
4810            assert!(nodes_with_missing[1].is_none()); // Missing node
4811            assert!(nodes_with_missing[2].is_some());
4812        }
4813
4814        #[test]
4815        fn test_auto_commit_setting() {
4816            let db = GrafeoDB::new_in_memory();
4817            let mut session = db.session();
4818
4819            // Default is auto-commit enabled
4820            assert!(session.auto_commit());
4821
4822            session.set_auto_commit(false);
4823            assert!(!session.auto_commit());
4824
4825            session.set_auto_commit(true);
4826            assert!(session.auto_commit());
4827        }
4828
4829        #[test]
4830        fn test_transaction_double_begin_nests() {
4831            let db = GrafeoDB::new_in_memory();
4832            let mut session = db.session();
4833
4834            session.begin_transaction().unwrap();
4835            // Second begin_transaction creates a nested transaction (auto-savepoint)
4836            let result = session.begin_transaction();
4837            assert!(result.is_ok());
4838            // Commit the inner (releases savepoint)
4839            session.commit().unwrap();
4840            // Commit the outer
4841            session.commit().unwrap();
4842        }
4843
4844        #[test]
4845        fn test_commit_without_transaction_error() {
4846            let db = GrafeoDB::new_in_memory();
4847            let mut session = db.session();
4848
4849            let result = session.commit();
4850            assert!(result.is_err());
4851        }
4852
4853        #[test]
4854        fn test_rollback_without_transaction_error() {
4855            let db = GrafeoDB::new_in_memory();
4856            let mut session = db.session();
4857
4858            let result = session.rollback();
4859            assert!(result.is_err());
4860        }
4861
4862        #[test]
4863        fn test_create_edge_in_transaction() {
4864            let db = GrafeoDB::new_in_memory();
4865            let mut session = db.session();
4866
4867            // Create nodes outside transaction
4868            let alix = session.create_node(&["Person"]);
4869            let gus = session.create_node(&["Person"]);
4870
4871            // Create edge in transaction
4872            session.begin_transaction().unwrap();
4873            let edge_id = session.create_edge(alix, gus, "KNOWS");
4874
4875            // Edge should be visible in the transaction
4876            assert!(session.edge_exists(edge_id));
4877
4878            // Commit
4879            session.commit().unwrap();
4880
4881            // Edge should still be visible
4882            assert!(session.edge_exists(edge_id));
4883        }
4884
4885        #[test]
4886        fn test_neighbors_empty_node() {
4887            let db = GrafeoDB::new_in_memory();
4888            let session = db.session();
4889
4890            let lonely = session.create_node(&["Person"]);
4891
4892            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4893            assert!(session.get_neighbors_incoming(lonely).is_empty());
4894            assert!(
4895                session
4896                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4897                    .is_empty()
4898            );
4899        }
4900    }
4901
4902    #[test]
4903    fn test_auto_gc_triggers_on_commit_interval() {
4904        use crate::config::Config;
4905
4906        let config = Config::in_memory().with_gc_interval(2);
4907        let db = GrafeoDB::with_config(config).unwrap();
4908        let mut session = db.session();
4909
4910        // First commit: counter = 1, no GC (not a multiple of 2)
4911        session.begin_transaction().unwrap();
4912        session.create_node(&["A"]);
4913        session.commit().unwrap();
4914
4915        // Second commit: counter = 2, GC should trigger (multiple of 2)
4916        session.begin_transaction().unwrap();
4917        session.create_node(&["B"]);
4918        session.commit().unwrap();
4919
4920        // Verify the database is still functional after GC
4921        assert_eq!(db.node_count(), 2);
4922    }
4923
4924    #[test]
4925    fn test_query_timeout_config_propagates_to_session() {
4926        use crate::config::Config;
4927        use std::time::Duration;
4928
4929        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4930        let db = GrafeoDB::with_config(config).unwrap();
4931        let session = db.session();
4932
4933        // Verify the session has a query deadline (timeout was set)
4934        assert!(session.query_deadline().is_some());
4935    }
4936
4937    #[test]
4938    fn test_no_query_timeout_returns_no_deadline() {
4939        let db = GrafeoDB::new_in_memory();
4940        let session = db.session();
4941
4942        // Default config has no timeout
4943        assert!(session.query_deadline().is_none());
4944    }
4945
4946    #[test]
4947    fn test_graph_model_accessor() {
4948        use crate::config::GraphModel;
4949
4950        let db = GrafeoDB::new_in_memory();
4951        let session = db.session();
4952
4953        assert_eq!(session.graph_model(), GraphModel::Lpg);
4954    }
4955
4956    #[cfg(feature = "gql")]
4957    #[test]
4958    fn test_external_store_session() {
4959        use grafeo_core::graph::GraphStoreMut;
4960        use std::sync::Arc;
4961
4962        let config = crate::config::Config::in_memory();
4963        let store =
4964            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4965        let db = GrafeoDB::with_store(store, config).unwrap();
4966
4967        let mut session = db.session();
4968
4969        // Use an explicit transaction so that INSERT and MATCH share the same
4970        // transaction context. With PENDING epochs, uncommitted versions are
4971        // only visible to the owning transaction.
4972        session.begin_transaction().unwrap();
4973        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4974
4975        // Verify we can query through it within the same transaction
4976        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4977        assert_eq!(result.row_count(), 1);
4978
4979        session.commit().unwrap();
4980    }
4981
4982    // ==================== Session Command Tests ====================
4983
4984    #[cfg(feature = "gql")]
4985    mod session_command_tests {
4986        use super::*;
4987        use grafeo_common::types::Value;
4988
4989        #[test]
4990        fn test_use_graph_sets_current_graph() {
4991            let db = GrafeoDB::new_in_memory();
4992            let session = db.session();
4993
4994            // Create the graph first, then USE it
4995            session.execute("CREATE GRAPH mydb").unwrap();
4996            session.execute("USE GRAPH mydb").unwrap();
4997
4998            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4999        }
5000
5001        #[test]
5002        fn test_use_graph_nonexistent_errors() {
5003            let db = GrafeoDB::new_in_memory();
5004            let session = db.session();
5005
5006            let result = session.execute("USE GRAPH doesnotexist");
5007            assert!(result.is_err());
5008            let err = result.unwrap_err().to_string();
5009            assert!(
5010                err.contains("does not exist"),
5011                "Expected 'does not exist' error, got: {err}"
5012            );
5013        }
5014
5015        #[test]
5016        fn test_use_graph_default_always_valid() {
5017            let db = GrafeoDB::new_in_memory();
5018            let session = db.session();
5019
5020            // "default" is always valid, even without CREATE GRAPH
5021            session.execute("USE GRAPH default").unwrap();
5022            assert_eq!(session.current_graph(), Some("default".to_string()));
5023        }
5024
5025        #[test]
5026        fn test_session_set_graph() {
5027            let db = GrafeoDB::new_in_memory();
5028            let session = db.session();
5029
5030            session.execute("CREATE GRAPH analytics").unwrap();
5031            session.execute("SESSION SET GRAPH analytics").unwrap();
5032            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5033        }
5034
5035        #[test]
5036        fn test_session_set_graph_nonexistent_errors() {
5037            let db = GrafeoDB::new_in_memory();
5038            let session = db.session();
5039
5040            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5041            assert!(result.is_err());
5042        }
5043
5044        #[test]
5045        fn test_session_set_time_zone() {
5046            let db = GrafeoDB::new_in_memory();
5047            let session = db.session();
5048
5049            assert_eq!(session.time_zone(), None);
5050
5051            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5052            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5053
5054            session
5055                .execute("SESSION SET TIME ZONE 'America/New_York'")
5056                .unwrap();
5057            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5058        }
5059
5060        #[test]
5061        fn test_session_set_parameter() {
5062            let db = GrafeoDB::new_in_memory();
5063            let session = db.session();
5064
5065            session
5066                .execute("SESSION SET PARAMETER $timeout = 30")
5067                .unwrap();
5068
5069            // Parameter is stored (value is Null for now, since expression
5070            // evaluation is not yet wired up)
5071            assert!(session.get_parameter("timeout").is_some());
5072        }
5073
5074        #[test]
5075        fn test_session_reset_clears_all_state() {
5076            let db = GrafeoDB::new_in_memory();
5077            let session = db.session();
5078
5079            // Set various session state
5080            session.execute("CREATE GRAPH analytics").unwrap();
5081            session.execute("SESSION SET GRAPH analytics").unwrap();
5082            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5083            session
5084                .execute("SESSION SET PARAMETER $limit = 100")
5085                .unwrap();
5086
5087            // Verify state was set
5088            assert!(session.current_graph().is_some());
5089            assert!(session.time_zone().is_some());
5090            assert!(session.get_parameter("limit").is_some());
5091
5092            // Reset everything
5093            session.execute("SESSION RESET").unwrap();
5094
5095            assert_eq!(session.current_graph(), None);
5096            assert_eq!(session.time_zone(), None);
5097            assert!(session.get_parameter("limit").is_none());
5098        }
5099
5100        #[test]
5101        fn test_session_close_clears_state() {
5102            let db = GrafeoDB::new_in_memory();
5103            let session = db.session();
5104
5105            session.execute("CREATE GRAPH analytics").unwrap();
5106            session.execute("SESSION SET GRAPH analytics").unwrap();
5107            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5108
5109            session.execute("SESSION CLOSE").unwrap();
5110
5111            assert_eq!(session.current_graph(), None);
5112            assert_eq!(session.time_zone(), None);
5113        }
5114
5115        #[test]
5116        fn test_create_graph() {
5117            let db = GrafeoDB::new_in_memory();
5118            let session = db.session();
5119
5120            session.execute("CREATE GRAPH mydb").unwrap();
5121
5122            // Should be able to USE it now
5123            session.execute("USE GRAPH mydb").unwrap();
5124            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5125        }
5126
5127        #[test]
5128        fn test_create_graph_duplicate_errors() {
5129            let db = GrafeoDB::new_in_memory();
5130            let session = db.session();
5131
5132            session.execute("CREATE GRAPH mydb").unwrap();
5133            let result = session.execute("CREATE GRAPH mydb");
5134
5135            assert!(result.is_err());
5136            let err = result.unwrap_err().to_string();
5137            assert!(
5138                err.contains("already exists"),
5139                "Expected 'already exists' error, got: {err}"
5140            );
5141        }
5142
5143        #[test]
5144        fn test_create_graph_if_not_exists() {
5145            let db = GrafeoDB::new_in_memory();
5146            let session = db.session();
5147
5148            session.execute("CREATE GRAPH mydb").unwrap();
5149            // Should succeed silently with IF NOT EXISTS
5150            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5151        }
5152
5153        #[test]
5154        fn test_drop_graph() {
5155            let db = GrafeoDB::new_in_memory();
5156            let session = db.session();
5157
5158            session.execute("CREATE GRAPH mydb").unwrap();
5159            session.execute("DROP GRAPH mydb").unwrap();
5160
5161            // Should no longer be usable
5162            let result = session.execute("USE GRAPH mydb");
5163            assert!(result.is_err());
5164        }
5165
5166        #[test]
5167        fn test_drop_graph_nonexistent_errors() {
5168            let db = GrafeoDB::new_in_memory();
5169            let session = db.session();
5170
5171            let result = session.execute("DROP GRAPH nosuchgraph");
5172            assert!(result.is_err());
5173            let err = result.unwrap_err().to_string();
5174            assert!(
5175                err.contains("does not exist"),
5176                "Expected 'does not exist' error, got: {err}"
5177            );
5178        }
5179
5180        #[test]
5181        fn test_drop_graph_if_exists() {
5182            let db = GrafeoDB::new_in_memory();
5183            let session = db.session();
5184
5185            // Should succeed silently with IF EXISTS
5186            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5187        }
5188
5189        #[test]
5190        fn test_start_transaction_via_gql() {
5191            let db = GrafeoDB::new_in_memory();
5192            let session = db.session();
5193
5194            session.execute("START TRANSACTION").unwrap();
5195            assert!(session.in_transaction());
5196            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5197            session.execute("COMMIT").unwrap();
5198            assert!(!session.in_transaction());
5199
5200            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5201            assert_eq!(result.rows.len(), 1);
5202        }
5203
5204        #[test]
5205        fn test_start_transaction_read_only_blocks_insert() {
5206            let db = GrafeoDB::new_in_memory();
5207            let session = db.session();
5208
5209            session.execute("START TRANSACTION READ ONLY").unwrap();
5210            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5211            assert!(result.is_err());
5212            let err = result.unwrap_err().to_string();
5213            assert!(
5214                err.contains("read-only"),
5215                "Expected read-only error, got: {err}"
5216            );
5217            session.execute("ROLLBACK").unwrap();
5218        }
5219
5220        #[test]
5221        fn test_start_transaction_read_only_allows_reads() {
5222            let db = GrafeoDB::new_in_memory();
5223            let mut session = db.session();
5224            session.begin_transaction().unwrap();
5225            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5226            session.commit().unwrap();
5227
5228            session.execute("START TRANSACTION READ ONLY").unwrap();
5229            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5230            assert_eq!(result.rows.len(), 1);
5231            session.execute("COMMIT").unwrap();
5232        }
5233
5234        #[test]
5235        fn test_rollback_via_gql() {
5236            let db = GrafeoDB::new_in_memory();
5237            let session = db.session();
5238
5239            session.execute("START TRANSACTION").unwrap();
5240            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5241            session.execute("ROLLBACK").unwrap();
5242
5243            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5244            assert!(result.rows.is_empty());
5245        }
5246
5247        #[test]
5248        fn test_start_transaction_with_isolation_level() {
5249            let db = GrafeoDB::new_in_memory();
5250            let session = db.session();
5251
5252            session
5253                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5254                .unwrap();
5255            assert!(session.in_transaction());
5256            session.execute("ROLLBACK").unwrap();
5257        }
5258
5259        #[test]
5260        fn test_session_commands_return_empty_result() {
5261            let db = GrafeoDB::new_in_memory();
5262            let session = db.session();
5263
5264            session.execute("CREATE GRAPH test").unwrap();
5265            let result = session.execute("SESSION SET GRAPH test").unwrap();
5266            assert_eq!(result.row_count(), 0);
5267            assert_eq!(result.column_count(), 0);
5268        }
5269
5270        #[test]
5271        fn test_current_graph_default_is_none() {
5272            let db = GrafeoDB::new_in_memory();
5273            let session = db.session();
5274
5275            assert_eq!(session.current_graph(), None);
5276        }
5277
5278        #[test]
5279        fn test_time_zone_default_is_none() {
5280            let db = GrafeoDB::new_in_memory();
5281            let session = db.session();
5282
5283            assert_eq!(session.time_zone(), None);
5284        }
5285
5286        #[test]
5287        fn test_session_state_independent_across_sessions() {
5288            let db = GrafeoDB::new_in_memory();
5289            let session1 = db.session();
5290            let session2 = db.session();
5291
5292            session1.execute("CREATE GRAPH first").unwrap();
5293            session1.execute("CREATE GRAPH second").unwrap();
5294            session1.execute("SESSION SET GRAPH first").unwrap();
5295            session2.execute("SESSION SET GRAPH second").unwrap();
5296
5297            assert_eq!(session1.current_graph(), Some("first".to_string()));
5298            assert_eq!(session2.current_graph(), Some("second".to_string()));
5299        }
5300
5301        #[test]
5302        fn test_show_node_types() {
5303            let db = GrafeoDB::new_in_memory();
5304            let session = db.session();
5305
5306            session
5307                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5308                .unwrap();
5309
5310            let result = session.execute("SHOW NODE TYPES").unwrap();
5311            assert_eq!(
5312                result.columns,
5313                vec!["name", "properties", "constraints", "parents"]
5314            );
5315            assert_eq!(result.rows.len(), 1);
5316            // First column is the type name
5317            assert_eq!(result.rows[0][0], Value::from("Person"));
5318        }
5319
5320        #[test]
5321        fn test_show_edge_types() {
5322            let db = GrafeoDB::new_in_memory();
5323            let session = db.session();
5324
5325            session
5326                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5327                .unwrap();
5328
5329            let result = session.execute("SHOW EDGE TYPES").unwrap();
5330            assert_eq!(
5331                result.columns,
5332                vec!["name", "properties", "source_types", "target_types"]
5333            );
5334            assert_eq!(result.rows.len(), 1);
5335            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5336        }
5337
5338        #[test]
5339        fn test_show_graph_types() {
5340            let db = GrafeoDB::new_in_memory();
5341            let session = db.session();
5342
5343            session
5344                .execute("CREATE NODE TYPE Person (name STRING)")
5345                .unwrap();
5346            session
5347                .execute(
5348                    "CREATE GRAPH TYPE social (\
5349                        NODE TYPE Person (name STRING)\
5350                    )",
5351                )
5352                .unwrap();
5353
5354            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5355            assert_eq!(
5356                result.columns,
5357                vec!["name", "open", "node_types", "edge_types"]
5358            );
5359            assert_eq!(result.rows.len(), 1);
5360            assert_eq!(result.rows[0][0], Value::from("social"));
5361        }
5362
5363        #[test]
5364        fn test_show_graph_type_named() {
5365            let db = GrafeoDB::new_in_memory();
5366            let session = db.session();
5367
5368            session
5369                .execute("CREATE NODE TYPE Person (name STRING)")
5370                .unwrap();
5371            session
5372                .execute(
5373                    "CREATE GRAPH TYPE social (\
5374                        NODE TYPE Person (name STRING)\
5375                    )",
5376                )
5377                .unwrap();
5378
5379            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5380            assert_eq!(result.rows.len(), 1);
5381            assert_eq!(result.rows[0][0], Value::from("social"));
5382        }
5383
5384        #[test]
5385        fn test_show_graph_type_not_found() {
5386            let db = GrafeoDB::new_in_memory();
5387            let session = db.session();
5388
5389            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5390            assert!(result.is_err());
5391        }
5392
5393        #[test]
5394        fn test_show_indexes_via_gql() {
5395            let db = GrafeoDB::new_in_memory();
5396            let session = db.session();
5397
5398            let result = session.execute("SHOW INDEXES").unwrap();
5399            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5400        }
5401
5402        #[test]
5403        fn test_show_constraints_via_gql() {
5404            let db = GrafeoDB::new_in_memory();
5405            let session = db.session();
5406
5407            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5408            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5409        }
5410
5411        #[test]
5412        fn test_pattern_form_graph_type_roundtrip() {
5413            let db = GrafeoDB::new_in_memory();
5414            let session = db.session();
5415
5416            // Register the types first
5417            session
5418                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5419                .unwrap();
5420            session
5421                .execute("CREATE NODE TYPE City (name STRING)")
5422                .unwrap();
5423            session
5424                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5425                .unwrap();
5426            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5427
5428            // Create graph type using pattern form
5429            session
5430                .execute(
5431                    "CREATE GRAPH TYPE social (\
5432                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5433                        (:Person)-[:LIVES_IN]->(:City)\
5434                    )",
5435                )
5436                .unwrap();
5437
5438            // Verify it was created
5439            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5440            assert_eq!(result.rows.len(), 1);
5441            assert_eq!(result.rows[0][0], Value::from("social"));
5442        }
5443    }
5444}