Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28/// Parses a DDL default-value literal string into a [`Value`].
29///
30/// Handles string literals (single- or double-quoted), integers, floats,
31/// booleans (`true`/`false`), and `NULL`.
32fn parse_default_literal(text: &str) -> Value {
33    if text.eq_ignore_ascii_case("null") {
34        return Value::Null;
35    }
36    if text.eq_ignore_ascii_case("true") {
37        return Value::Bool(true);
38    }
39    if text.eq_ignore_ascii_case("false") {
40        return Value::Bool(false);
41    }
42    // String literal: strip surrounding quotes
43    if (text.starts_with('\'') && text.ends_with('\''))
44        || (text.starts_with('"') && text.ends_with('"'))
45    {
46        return Value::String(text[1..text.len() - 1].into());
47    }
48    // Try integer, then float
49    if let Ok(i) = text.parse::<i64>() {
50        return Value::Int64(i);
51    }
52    if let Ok(f) = text.parse::<f64>() {
53        return Value::Float64(f);
54    }
55    // Fallback: treat as string
56    Value::String(text.into())
57}
58
59/// Runtime configuration for creating a new session.
60///
61/// Groups the shared parameters passed to all session constructors, keeping
62/// call sites readable and avoiding long argument lists.
63pub(crate) struct SessionConfig {
64    pub transaction_manager: Arc<TransactionManager>,
65    pub query_cache: Arc<QueryCache>,
66    pub catalog: Arc<Catalog>,
67    pub adaptive_config: AdaptiveConfig,
68    pub factorized_execution: bool,
69    pub graph_model: GraphModel,
70    pub query_timeout: Option<Duration>,
71    pub commit_counter: Arc<AtomicUsize>,
72    pub gc_interval: usize,
73}
74
75/// Your handle to the database - execute queries and manage transactions.
76///
77/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
78/// tracks its own transaction state, so you can have multiple concurrent
79/// sessions without them interfering.
80pub struct Session {
81    /// The underlying store.
82    store: Arc<LpgStore>,
83    /// Graph store trait object for pluggable storage backends.
84    graph_store: Arc<dyn GraphStoreMut>,
85    /// Schema and metadata catalog shared across sessions.
86    catalog: Arc<Catalog>,
87    /// RDF triple store (if RDF feature is enabled).
88    #[cfg(feature = "rdf")]
89    rdf_store: Arc<RdfStore>,
90    /// Transaction manager.
91    transaction_manager: Arc<TransactionManager>,
92    /// Query cache shared across sessions.
93    query_cache: Arc<QueryCache>,
94    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
95    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
96    /// from within `execute(&self)`.
97    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
98    /// Whether the current transaction is read-only (blocks mutations).
99    read_only_tx: parking_lot::Mutex<bool>,
100    /// Whether the session is in auto-commit mode.
101    auto_commit: bool,
102    /// Adaptive execution configuration.
103    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
104    adaptive_config: AdaptiveConfig,
105    /// Whether to use factorized execution for multi-hop queries.
106    factorized_execution: bool,
107    /// The graph data model this session operates on.
108    graph_model: GraphModel,
109    /// Maximum time a query may run before being cancelled.
110    query_timeout: Option<Duration>,
111    /// Shared commit counter for triggering auto-GC.
112    commit_counter: Arc<AtomicUsize>,
113    /// GC every N commits (0 = disabled).
114    gc_interval: usize,
115    /// Node count at the start of the current transaction (for PreparedCommit stats).
116    transaction_start_node_count: AtomicUsize,
117    /// Edge count at the start of the current transaction (for PreparedCommit stats).
118    transaction_start_edge_count: AtomicUsize,
119    /// WAL for logging schema changes.
120    #[cfg(feature = "wal")]
121    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
122    /// Shared WAL graph context tracker for named graph awareness.
123    #[cfg(feature = "wal")]
124    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
125    /// CDC log for change tracking.
126    #[cfg(feature = "cdc")]
127    cdc_log: Arc<crate::cdc::CdcLog>,
128    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
129    current_graph: parking_lot::Mutex<Option<String>>,
130    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
131    /// None = "not set" (uses default schema).
132    current_schema: parking_lot::Mutex<Option<String>>,
133    /// Session time zone override.
134    time_zone: parking_lot::Mutex<Option<String>>,
135    /// Session-level parameters (SET PARAMETER).
136    session_params:
137        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
138    /// Override epoch for time-travel queries (None = use transaction/current epoch).
139    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
140    /// Savepoints within the current transaction.
141    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
142    /// Nesting depth for nested transactions (0 = outermost).
143    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
144    /// releases it, nested `ROLLBACK` rolls back to it.
145    transaction_nesting_depth: parking_lot::Mutex<u32>,
146    /// Named graphs touched during the current transaction (for cross-graph atomicity).
147    /// `None` represents the default graph. Populated at `BEGIN` time and on each
148    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
149    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
150    /// Shared metrics registry (populated when the `metrics` feature is enabled).
151    #[cfg(feature = "metrics")]
152    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
153    /// Transaction start time for duration tracking.
154    #[cfg(feature = "metrics")]
155    tx_start_time: parking_lot::Mutex<Option<Instant>>,
156}
157
158/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
159#[derive(Clone)]
160struct GraphSavepoint {
161    graph_name: Option<String>,
162    next_node_id: u64,
163    next_edge_id: u64,
164    undo_log_position: usize,
165}
166
167/// Savepoint state: name + per-graph snapshots + the graph that was active.
168#[derive(Clone)]
169struct SavepointState {
170    name: String,
171    graph_snapshots: Vec<GraphSavepoint>,
172    /// The graph that was active when the savepoint was created.
173    /// Reserved for future use (e.g., restoring graph context on rollback).
174    #[allow(dead_code)]
175    active_graph: Option<String>,
176}
177
178impl Session {
179    /// Creates a new session with adaptive execution configuration.
180    #[allow(dead_code)]
181    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
182        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
183        Self {
184            store,
185            graph_store,
186            catalog: cfg.catalog,
187            #[cfg(feature = "rdf")]
188            rdf_store: Arc::new(RdfStore::new()),
189            transaction_manager: cfg.transaction_manager,
190            query_cache: cfg.query_cache,
191            current_transaction: parking_lot::Mutex::new(None),
192            read_only_tx: parking_lot::Mutex::new(false),
193            auto_commit: true,
194            adaptive_config: cfg.adaptive_config,
195            factorized_execution: cfg.factorized_execution,
196            graph_model: cfg.graph_model,
197            query_timeout: cfg.query_timeout,
198            commit_counter: cfg.commit_counter,
199            gc_interval: cfg.gc_interval,
200            transaction_start_node_count: AtomicUsize::new(0),
201            transaction_start_edge_count: AtomicUsize::new(0),
202            #[cfg(feature = "wal")]
203            wal: None,
204            #[cfg(feature = "wal")]
205            wal_graph_context: None,
206            #[cfg(feature = "cdc")]
207            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
208            current_graph: parking_lot::Mutex::new(None),
209            current_schema: parking_lot::Mutex::new(None),
210            time_zone: parking_lot::Mutex::new(None),
211            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
212            viewing_epoch_override: parking_lot::Mutex::new(None),
213            savepoints: parking_lot::Mutex::new(Vec::new()),
214            transaction_nesting_depth: parking_lot::Mutex::new(0),
215            touched_graphs: parking_lot::Mutex::new(Vec::new()),
216            #[cfg(feature = "metrics")]
217            metrics: None,
218            #[cfg(feature = "metrics")]
219            tx_start_time: parking_lot::Mutex::new(None),
220        }
221    }
222
223    /// Sets the WAL for this session (shared with the database).
224    ///
225    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
226    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
227    #[cfg(feature = "wal")]
228    pub(crate) fn set_wal(
229        &mut self,
230        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
231        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
232    ) {
233        // Wrap the graph store so query-engine mutations are WAL-logged
234        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
235            Arc::clone(&self.store),
236            Arc::clone(&wal),
237            Arc::clone(&wal_graph_context),
238        ));
239        self.wal = Some(wal);
240        self.wal_graph_context = Some(wal_graph_context);
241    }
242
243    /// Sets the CDC log for this session (shared with the database).
244    #[cfg(feature = "cdc")]
245    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
246        self.cdc_log = cdc_log;
247    }
248
249    /// Sets the metrics registry for this session (shared with the database).
250    #[cfg(feature = "metrics")]
251    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
252        self.metrics = Some(metrics);
253    }
254
255    /// Creates a session backed by an external graph store.
256    ///
257    /// The external store handles all data operations. Transaction management
258    /// (begin/commit/rollback) is not supported for external stores.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the internal arena allocation fails (out of memory).
263    pub(crate) fn with_external_store(
264        store: Arc<dyn GraphStoreMut>,
265        cfg: SessionConfig,
266    ) -> Result<Self> {
267        Ok(Self {
268            store: Arc::new(LpgStore::new()?),
269            graph_store: store,
270            catalog: cfg.catalog,
271            #[cfg(feature = "rdf")]
272            rdf_store: Arc::new(RdfStore::new()),
273            transaction_manager: cfg.transaction_manager,
274            query_cache: cfg.query_cache,
275            current_transaction: parking_lot::Mutex::new(None),
276            read_only_tx: parking_lot::Mutex::new(false),
277            auto_commit: true,
278            adaptive_config: cfg.adaptive_config,
279            factorized_execution: cfg.factorized_execution,
280            graph_model: cfg.graph_model,
281            query_timeout: cfg.query_timeout,
282            commit_counter: cfg.commit_counter,
283            gc_interval: cfg.gc_interval,
284            transaction_start_node_count: AtomicUsize::new(0),
285            transaction_start_edge_count: AtomicUsize::new(0),
286            #[cfg(feature = "wal")]
287            wal: None,
288            #[cfg(feature = "wal")]
289            wal_graph_context: None,
290            #[cfg(feature = "cdc")]
291            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
292            current_graph: parking_lot::Mutex::new(None),
293            current_schema: parking_lot::Mutex::new(None),
294            time_zone: parking_lot::Mutex::new(None),
295            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
296            viewing_epoch_override: parking_lot::Mutex::new(None),
297            savepoints: parking_lot::Mutex::new(Vec::new()),
298            transaction_nesting_depth: parking_lot::Mutex::new(0),
299            touched_graphs: parking_lot::Mutex::new(Vec::new()),
300            #[cfg(feature = "metrics")]
301            metrics: None,
302            #[cfg(feature = "metrics")]
303            tx_start_time: parking_lot::Mutex::new(None),
304        })
305    }
306
307    /// Returns the graph model this session operates on.
308    #[must_use]
309    pub fn graph_model(&self) -> GraphModel {
310        self.graph_model
311    }
312
313    // === Session State Management ===
314
315    /// Sets the current graph for this session (USE GRAPH).
316    pub fn use_graph(&self, name: &str) {
317        *self.current_graph.lock() = Some(name.to_string());
318    }
319
320    /// Returns the current graph name, if any.
321    #[must_use]
322    pub fn current_graph(&self) -> Option<String> {
323        self.current_graph.lock().clone()
324    }
325
326    /// Sets the current schema for this session (SESSION SET SCHEMA).
327    ///
328    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
329    pub fn set_schema(&self, name: &str) {
330        *self.current_schema.lock() = Some(name.to_string());
331    }
332
333    /// Returns the current schema name, if any.
334    ///
335    /// `None` means "not set", which resolves to the default schema.
336    #[must_use]
337    pub fn current_schema(&self) -> Option<String> {
338        self.current_schema.lock().clone()
339    }
340
341    /// Computes the effective storage key for a graph, accounting for schema context.
342    ///
343    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
344    /// Uses `/` as separator since it is invalid in GQL identifiers.
345    fn effective_graph_key(&self, graph_name: &str) -> String {
346        let schema = self.current_schema.lock().clone();
347        match schema {
348            Some(s) => format!("{s}/{graph_name}"),
349            None => graph_name.to_string(),
350        }
351    }
352
353    /// Returns the effective storage key for the current graph, accounting for schema.
354    ///
355    /// Combines `current_schema` and `current_graph` into a flat lookup key.
356    fn active_graph_storage_key(&self) -> Option<String> {
357        let graph = self.current_graph.lock().clone();
358        let schema = self.current_schema.lock().clone();
359        match (schema, graph) {
360            (_, None) => None,
361            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
362            (None, Some(name)) => Some(name),
363            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
364        }
365    }
366
367    /// Returns the graph store for the currently active graph.
368    ///
369    /// If `current_graph` is `None` or `"default"`, returns the session's
370    /// default `graph_store` (already WAL-wrapped for the default graph).
371    /// Otherwise looks up the named graph in the root store and wraps it
372    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
373    /// graph context.
374    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
375        let key = self.active_graph_storage_key();
376        match key {
377            None => Arc::clone(&self.graph_store),
378            Some(ref name) => match self.store.graph(name) {
379                Some(named_store) => {
380                    #[cfg(feature = "wal")]
381                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
382                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
383                            named_store,
384                            Arc::clone(wal),
385                            name.clone(),
386                            Arc::clone(ctx),
387                        )) as Arc<dyn GraphStoreMut>;
388                    }
389                    named_store as Arc<dyn GraphStoreMut>
390                }
391                None => Arc::clone(&self.graph_store),
392            },
393        }
394    }
395
396    /// Returns the concrete `LpgStore` for the currently active graph.
397    ///
398    /// Used by direct CRUD methods that need the concrete store type
399    /// for versioned operations.
400    fn active_lpg_store(&self) -> Arc<LpgStore> {
401        let key = self.active_graph_storage_key();
402        match key {
403            None => Arc::clone(&self.store),
404            Some(ref name) => self
405                .store
406                .graph(name)
407                .unwrap_or_else(|| Arc::clone(&self.store)),
408        }
409    }
410
411    /// Resolves a graph name to a concrete `LpgStore`.
412    /// `None` and `"default"` resolve to the session's root store.
413    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
414        match graph_name {
415            None => Arc::clone(&self.store),
416            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
417            Some(name) => self
418                .store
419                .graph(name)
420                .unwrap_or_else(|| Arc::clone(&self.store)),
421        }
422    }
423
424    /// Records the current graph as "touched" if a transaction is active.
425    ///
426    /// Uses the full storage key (schema/graph) so that commit/rollback
427    /// can resolve the correct store via `resolve_store`.
428    fn track_graph_touch(&self) {
429        if self.current_transaction.lock().is_some() {
430            let key = self.active_graph_storage_key();
431            let mut touched = self.touched_graphs.lock();
432            if !touched.contains(&key) {
433                touched.push(key);
434            }
435        }
436    }
437
438    /// Sets the session time zone.
439    pub fn set_time_zone(&self, tz: &str) {
440        *self.time_zone.lock() = Some(tz.to_string());
441    }
442
443    /// Returns the session time zone, if set.
444    #[must_use]
445    pub fn time_zone(&self) -> Option<String> {
446        self.time_zone.lock().clone()
447    }
448
449    /// Sets a session parameter.
450    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
451        self.session_params.lock().insert(key.to_string(), value);
452    }
453
454    /// Gets a session parameter by cloning it.
455    #[must_use]
456    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
457        self.session_params.lock().get(key).cloned()
458    }
459
460    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
461    pub fn reset_session(&self) {
462        *self.current_schema.lock() = None;
463        *self.current_graph.lock() = None;
464        *self.time_zone.lock() = None;
465        self.session_params.lock().clear();
466        *self.viewing_epoch_override.lock() = None;
467    }
468
469    /// Resets only the session schema (Section 7.2 GR1).
470    pub fn reset_schema(&self) {
471        *self.current_schema.lock() = None;
472    }
473
474    /// Resets only the session graph (Section 7.2 GR2).
475    pub fn reset_graph(&self) {
476        *self.current_graph.lock() = None;
477    }
478
479    /// Resets only the session time zone (Section 7.2 GR3).
480    pub fn reset_time_zone(&self) {
481        *self.time_zone.lock() = None;
482    }
483
484    /// Resets only session parameters (Section 7.2 GR4).
485    pub fn reset_parameters(&self) {
486        self.session_params.lock().clear();
487    }
488
489    // --- Time-travel API ---
490
491    /// Sets a viewing epoch override for time-travel queries.
492    ///
493    /// While set, all queries on this session see the database as it existed
494    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
495    /// to return to normal behavior.
496    pub fn set_viewing_epoch(&self, epoch: EpochId) {
497        *self.viewing_epoch_override.lock() = Some(epoch);
498    }
499
500    /// Clears the viewing epoch override, returning to normal behavior.
501    pub fn clear_viewing_epoch(&self) {
502        *self.viewing_epoch_override.lock() = None;
503    }
504
505    /// Returns the current viewing epoch override, if any.
506    #[must_use]
507    pub fn viewing_epoch(&self) -> Option<EpochId> {
508        *self.viewing_epoch_override.lock()
509    }
510
511    /// Returns all versions of a node with their creation/deletion epochs.
512    ///
513    /// Properties and labels reflect the current state (not versioned per-epoch).
514    #[must_use]
515    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
516        self.active_lpg_store().get_node_history(id)
517    }
518
519    /// Returns all versions of an edge with their creation/deletion epochs.
520    ///
521    /// Properties reflect the current state (not versioned per-epoch).
522    #[must_use]
523    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
524        self.active_lpg_store().get_edge_history(id)
525    }
526
527    /// Checks that the session's graph model supports LPG operations.
528    fn require_lpg(&self, language: &str) -> Result<()> {
529        if self.graph_model == GraphModel::Rdf {
530            return Err(grafeo_common::utils::error::Error::Internal(format!(
531                "This is an RDF database. {language} queries require an LPG database."
532            )));
533        }
534        Ok(())
535    }
536
537    /// Executes a session or transaction command, returning an empty result.
538    #[cfg(feature = "gql")]
539    fn execute_session_command(
540        &self,
541        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
542    ) -> Result<QueryResult> {
543        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
544        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
545
546        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
547        if *self.read_only_tx.lock() {
548            match &cmd {
549                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
550                    return Err(Error::Transaction(
551                        grafeo_common::utils::error::TransactionError::ReadOnly,
552                    ));
553                }
554                _ => {} // Session state + transaction control allowed
555            }
556        }
557
558        match cmd {
559            SessionCommand::CreateGraph {
560                name,
561                if_not_exists,
562                typed,
563                like_graph,
564                copy_of,
565                open: _,
566            } => {
567                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
568                let storage_key = self.effective_graph_key(&name);
569
570                // Validate source graph exists for LIKE / AS COPY OF
571                if let Some(ref src) = like_graph {
572                    let src_key = self.effective_graph_key(src);
573                    if self.store.graph(&src_key).is_none() {
574                        return Err(Error::Query(QueryError::new(
575                            QueryErrorKind::Semantic,
576                            format!("Source graph '{src}' does not exist"),
577                        )));
578                    }
579                }
580                if let Some(ref src) = copy_of {
581                    let src_key = self.effective_graph_key(src);
582                    if self.store.graph(&src_key).is_none() {
583                        return Err(Error::Query(QueryError::new(
584                            QueryErrorKind::Semantic,
585                            format!("Source graph '{src}' does not exist"),
586                        )));
587                    }
588                }
589
590                let created = self
591                    .store
592                    .create_graph(&storage_key)
593                    .map_err(|e| Error::Internal(e.to_string()))?;
594                if !created && !if_not_exists {
595                    return Err(Error::Query(QueryError::new(
596                        QueryErrorKind::Semantic,
597                        format!("Graph '{name}' already exists"),
598                    )));
599                }
600                if created {
601                    #[cfg(feature = "wal")]
602                    self.log_schema_wal(
603                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
604                            name: storage_key.clone(),
605                        },
606                    );
607                }
608
609                // AS COPY OF: copy data from source graph
610                if let Some(ref src) = copy_of {
611                    let src_key = self.effective_graph_key(src);
612                    self.store
613                        .copy_graph(Some(&src_key), Some(&storage_key))
614                        .map_err(|e| Error::Internal(e.to_string()))?;
615                }
616
617                // Bind to graph type if specified
618                if let Some(type_name) = typed
619                    && let Err(e) = self
620                        .catalog
621                        .bind_graph_type(&storage_key, type_name.clone())
622                {
623                    return Err(Error::Query(QueryError::new(
624                        QueryErrorKind::Semantic,
625                        e.to_string(),
626                    )));
627                }
628
629                // LIKE: copy graph type binding from source
630                if let Some(ref src) = like_graph {
631                    let src_key = self.effective_graph_key(src);
632                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
633                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
634                    }
635                }
636
637                Ok(QueryResult::empty())
638            }
639            SessionCommand::DropGraph { name, if_exists } => {
640                let storage_key = self.effective_graph_key(&name);
641                let dropped = self.store.drop_graph(&storage_key);
642                if !dropped && !if_exists {
643                    return Err(Error::Query(QueryError::new(
644                        QueryErrorKind::Semantic,
645                        format!("Graph '{name}' does not exist"),
646                    )));
647                }
648                if dropped {
649                    #[cfg(feature = "wal")]
650                    self.log_schema_wal(
651                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
652                            name: storage_key.clone(),
653                        },
654                    );
655                    // If this session was using the dropped graph, reset to default
656                    let mut current = self.current_graph.lock();
657                    if current
658                        .as_deref()
659                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
660                    {
661                        *current = None;
662                    }
663                }
664                Ok(QueryResult::empty())
665            }
666            SessionCommand::UseGraph(name) => {
667                // Verify graph exists (resolve within current schema)
668                let effective_key = self.effective_graph_key(&name);
669                if !name.eq_ignore_ascii_case("default")
670                    && self.store.graph(&effective_key).is_none()
671                {
672                    return Err(Error::Query(QueryError::new(
673                        QueryErrorKind::Semantic,
674                        format!("Graph '{name}' does not exist"),
675                    )));
676                }
677                self.use_graph(&name);
678                // Track the new graph if in a transaction
679                self.track_graph_touch();
680                Ok(QueryResult::empty())
681            }
682            SessionCommand::SessionSetGraph(name) => {
683                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
684                let effective_key = self.effective_graph_key(&name);
685                if !name.eq_ignore_ascii_case("default")
686                    && self.store.graph(&effective_key).is_none()
687                {
688                    return Err(Error::Query(QueryError::new(
689                        QueryErrorKind::Semantic,
690                        format!("Graph '{name}' does not exist"),
691                    )));
692                }
693                self.use_graph(&name);
694                // Track the new graph if in a transaction
695                self.track_graph_touch();
696                Ok(QueryResult::empty())
697            }
698            SessionCommand::SessionSetSchema(name) => {
699                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
700                if !self.catalog.schema_exists(&name) {
701                    return Err(Error::Query(QueryError::new(
702                        QueryErrorKind::Semantic,
703                        format!("Schema '{name}' does not exist"),
704                    )));
705                }
706                self.set_schema(&name);
707                Ok(QueryResult::empty())
708            }
709            SessionCommand::SessionSetTimeZone(tz) => {
710                self.set_time_zone(&tz);
711                Ok(QueryResult::empty())
712            }
713            SessionCommand::SessionSetParameter(key, expr) => {
714                if key.eq_ignore_ascii_case("viewing_epoch") {
715                    match Self::eval_integer_literal(&expr) {
716                        Some(n) if n >= 0 => {
717                            self.set_viewing_epoch(EpochId::new(n as u64));
718                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
719                        }
720                        _ => Err(Error::Query(QueryError::new(
721                            QueryErrorKind::Semantic,
722                            "viewing_epoch must be a non-negative integer literal",
723                        ))),
724                    }
725                } else {
726                    // For now, store parameter name with Null value.
727                    // Full expression evaluation would require building and executing a plan.
728                    self.set_parameter(&key, Value::Null);
729                    Ok(QueryResult::empty())
730                }
731            }
732            SessionCommand::SessionReset(target) => {
733                use grafeo_adapters::query::gql::ast::SessionResetTarget;
734                match target {
735                    SessionResetTarget::All => self.reset_session(),
736                    SessionResetTarget::Schema => self.reset_schema(),
737                    SessionResetTarget::Graph => self.reset_graph(),
738                    SessionResetTarget::TimeZone => self.reset_time_zone(),
739                    SessionResetTarget::Parameters => self.reset_parameters(),
740                }
741                Ok(QueryResult::empty())
742            }
743            SessionCommand::SessionClose => {
744                self.reset_session();
745                Ok(QueryResult::empty())
746            }
747            SessionCommand::StartTransaction {
748                read_only,
749                isolation_level,
750            } => {
751                let engine_level = isolation_level.map(|l| match l {
752                    TransactionIsolationLevel::ReadCommitted => {
753                        crate::transaction::IsolationLevel::ReadCommitted
754                    }
755                    TransactionIsolationLevel::SnapshotIsolation => {
756                        crate::transaction::IsolationLevel::SnapshotIsolation
757                    }
758                    TransactionIsolationLevel::Serializable => {
759                        crate::transaction::IsolationLevel::Serializable
760                    }
761                });
762                self.begin_transaction_inner(read_only, engine_level)?;
763                Ok(QueryResult::status("Transaction started"))
764            }
765            SessionCommand::Commit => {
766                self.commit_inner()?;
767                Ok(QueryResult::status("Transaction committed"))
768            }
769            SessionCommand::Rollback => {
770                self.rollback_inner()?;
771                Ok(QueryResult::status("Transaction rolled back"))
772            }
773            SessionCommand::Savepoint(name) => {
774                self.savepoint(&name)?;
775                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
776            }
777            SessionCommand::RollbackToSavepoint(name) => {
778                self.rollback_to_savepoint(&name)?;
779                Ok(QueryResult::status(format!(
780                    "Rolled back to savepoint '{name}'"
781                )))
782            }
783            SessionCommand::ReleaseSavepoint(name) => {
784                self.release_savepoint(&name)?;
785                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
786            }
787        }
788    }
789
790    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
791    #[cfg(feature = "wal")]
792    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
793        if let Some(ref wal) = self.wal
794            && let Err(e) = wal.log(record)
795        {
796            tracing::warn!("Failed to log schema change to WAL: {}", e);
797        }
798    }
799
800    /// Executes a schema DDL command, returning a status result.
801    #[cfg(feature = "gql")]
802    fn execute_schema_command(
803        &self,
804        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
805    ) -> Result<QueryResult> {
806        use crate::catalog::{
807            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
808        };
809        use grafeo_adapters::query::gql::ast::SchemaStatement;
810        #[cfg(feature = "wal")]
811        use grafeo_adapters::storage::wal::WalRecord;
812        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
813
814        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
815        macro_rules! wal_log {
816            ($self:expr, $record:expr) => {
817                #[cfg(feature = "wal")]
818                $self.log_schema_wal(&$record);
819            };
820        }
821
822        let result = match cmd {
823            SchemaStatement::CreateNodeType(stmt) => {
824                #[cfg(feature = "wal")]
825                let props_for_wal: Vec<(String, String, bool)> = stmt
826                    .properties
827                    .iter()
828                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
829                    .collect();
830                let def = NodeTypeDefinition {
831                    name: stmt.name.clone(),
832                    properties: stmt
833                        .properties
834                        .iter()
835                        .map(|p| TypedProperty {
836                            name: p.name.clone(),
837                            data_type: PropertyDataType::from_type_name(&p.data_type),
838                            nullable: p.nullable,
839                            default_value: p
840                                .default_value
841                                .as_ref()
842                                .map(|s| parse_default_literal(s)),
843                        })
844                        .collect(),
845                    constraints: Vec::new(),
846                    parent_types: stmt.parent_types.clone(),
847                };
848                let result = if stmt.or_replace {
849                    let _ = self.catalog.drop_node_type(&stmt.name);
850                    self.catalog.register_node_type(def)
851                } else {
852                    self.catalog.register_node_type(def)
853                };
854                match result {
855                    Ok(()) => {
856                        wal_log!(
857                            self,
858                            WalRecord::CreateNodeType {
859                                name: stmt.name.clone(),
860                                properties: props_for_wal,
861                                constraints: Vec::new(),
862                            }
863                        );
864                        Ok(QueryResult::status(format!(
865                            "Created node type '{}'",
866                            stmt.name
867                        )))
868                    }
869                    Err(e) if stmt.if_not_exists => {
870                        let _ = e;
871                        Ok(QueryResult::status("No change"))
872                    }
873                    Err(e) => Err(Error::Query(QueryError::new(
874                        QueryErrorKind::Semantic,
875                        e.to_string(),
876                    ))),
877                }
878            }
879            SchemaStatement::CreateEdgeType(stmt) => {
880                #[cfg(feature = "wal")]
881                let props_for_wal: Vec<(String, String, bool)> = stmt
882                    .properties
883                    .iter()
884                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
885                    .collect();
886                let def = EdgeTypeDefinition {
887                    name: stmt.name.clone(),
888                    properties: stmt
889                        .properties
890                        .iter()
891                        .map(|p| TypedProperty {
892                            name: p.name.clone(),
893                            data_type: PropertyDataType::from_type_name(&p.data_type),
894                            nullable: p.nullable,
895                            default_value: p
896                                .default_value
897                                .as_ref()
898                                .map(|s| parse_default_literal(s)),
899                        })
900                        .collect(),
901                    constraints: Vec::new(),
902                    source_node_types: stmt.source_node_types.clone(),
903                    target_node_types: stmt.target_node_types.clone(),
904                };
905                let result = if stmt.or_replace {
906                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
907                    self.catalog.register_edge_type_def(def)
908                } else {
909                    self.catalog.register_edge_type_def(def)
910                };
911                match result {
912                    Ok(()) => {
913                        wal_log!(
914                            self,
915                            WalRecord::CreateEdgeType {
916                                name: stmt.name.clone(),
917                                properties: props_for_wal,
918                                constraints: Vec::new(),
919                            }
920                        );
921                        Ok(QueryResult::status(format!(
922                            "Created edge type '{}'",
923                            stmt.name
924                        )))
925                    }
926                    Err(e) if stmt.if_not_exists => {
927                        let _ = e;
928                        Ok(QueryResult::status("No change"))
929                    }
930                    Err(e) => Err(Error::Query(QueryError::new(
931                        QueryErrorKind::Semantic,
932                        e.to_string(),
933                    ))),
934                }
935            }
936            SchemaStatement::CreateVectorIndex(stmt) => {
937                Self::create_vector_index_on_store(
938                    &self.active_lpg_store(),
939                    &stmt.node_label,
940                    &stmt.property,
941                    stmt.dimensions,
942                    stmt.metric.as_deref(),
943                )?;
944                wal_log!(
945                    self,
946                    WalRecord::CreateIndex {
947                        name: stmt.name.clone(),
948                        label: stmt.node_label.clone(),
949                        property: stmt.property.clone(),
950                        index_type: "vector".to_string(),
951                    }
952                );
953                Ok(QueryResult::status(format!(
954                    "Created vector index '{}'",
955                    stmt.name
956                )))
957            }
958            SchemaStatement::DropNodeType { name, if_exists } => {
959                match self.catalog.drop_node_type(&name) {
960                    Ok(()) => {
961                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
962                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
963                    }
964                    Err(e) if if_exists => {
965                        let _ = e;
966                        Ok(QueryResult::status("No change"))
967                    }
968                    Err(e) => Err(Error::Query(QueryError::new(
969                        QueryErrorKind::Semantic,
970                        e.to_string(),
971                    ))),
972                }
973            }
974            SchemaStatement::DropEdgeType { name, if_exists } => {
975                match self.catalog.drop_edge_type_def(&name) {
976                    Ok(()) => {
977                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
978                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
979                    }
980                    Err(e) if if_exists => {
981                        let _ = e;
982                        Ok(QueryResult::status("No change"))
983                    }
984                    Err(e) => Err(Error::Query(QueryError::new(
985                        QueryErrorKind::Semantic,
986                        e.to_string(),
987                    ))),
988                }
989            }
990            SchemaStatement::CreateIndex(stmt) => {
991                use grafeo_adapters::query::gql::ast::IndexKind;
992                let active = self.active_lpg_store();
993                let index_type_str = match stmt.index_kind {
994                    IndexKind::Property => "property",
995                    IndexKind::BTree => "btree",
996                    IndexKind::Text => "text",
997                    IndexKind::Vector => "vector",
998                };
999                match stmt.index_kind {
1000                    IndexKind::Property | IndexKind::BTree => {
1001                        for prop in &stmt.properties {
1002                            active.create_property_index(prop);
1003                        }
1004                    }
1005                    IndexKind::Text => {
1006                        for prop in &stmt.properties {
1007                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1008                        }
1009                    }
1010                    IndexKind::Vector => {
1011                        for prop in &stmt.properties {
1012                            Self::create_vector_index_on_store(
1013                                &active,
1014                                &stmt.label,
1015                                prop,
1016                                stmt.options.dimensions,
1017                                stmt.options.metric.as_deref(),
1018                            )?;
1019                        }
1020                    }
1021                }
1022                #[cfg(feature = "wal")]
1023                for prop in &stmt.properties {
1024                    wal_log!(
1025                        self,
1026                        WalRecord::CreateIndex {
1027                            name: stmt.name.clone(),
1028                            label: stmt.label.clone(),
1029                            property: prop.clone(),
1030                            index_type: index_type_str.to_string(),
1031                        }
1032                    );
1033                }
1034                Ok(QueryResult::status(format!(
1035                    "Created {} index '{}'",
1036                    index_type_str, stmt.name
1037                )))
1038            }
1039            SchemaStatement::DropIndex { name, if_exists } => {
1040                // Try to drop property index by name
1041                let dropped = self.active_lpg_store().drop_property_index(&name);
1042                if dropped || if_exists {
1043                    if dropped {
1044                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1045                    }
1046                    Ok(QueryResult::status(if dropped {
1047                        format!("Dropped index '{name}'")
1048                    } else {
1049                        "No change".to_string()
1050                    }))
1051                } else {
1052                    Err(Error::Query(QueryError::new(
1053                        QueryErrorKind::Semantic,
1054                        format!("Index '{name}' does not exist"),
1055                    )))
1056                }
1057            }
1058            SchemaStatement::CreateConstraint(stmt) => {
1059                use crate::catalog::TypeConstraint;
1060                use grafeo_adapters::query::gql::ast::ConstraintKind;
1061                let kind_str = match stmt.constraint_kind {
1062                    ConstraintKind::Unique => "unique",
1063                    ConstraintKind::NodeKey => "node_key",
1064                    ConstraintKind::NotNull => "not_null",
1065                    ConstraintKind::Exists => "exists",
1066                };
1067                let constraint_name = stmt
1068                    .name
1069                    .clone()
1070                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1071
1072                // Register constraint in catalog type definitions
1073                match stmt.constraint_kind {
1074                    ConstraintKind::Unique => {
1075                        for prop in &stmt.properties {
1076                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1077                            let prop_id = self.catalog.get_or_create_property_key(prop);
1078                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1079                        }
1080                        let _ = self.catalog.add_constraint_to_type(
1081                            &stmt.label,
1082                            TypeConstraint::Unique(stmt.properties.clone()),
1083                        );
1084                    }
1085                    ConstraintKind::NodeKey => {
1086                        for prop in &stmt.properties {
1087                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1088                            let prop_id = self.catalog.get_or_create_property_key(prop);
1089                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1090                            let _ = self.catalog.add_required_property(label_id, prop_id);
1091                        }
1092                        let _ = self.catalog.add_constraint_to_type(
1093                            &stmt.label,
1094                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1095                        );
1096                    }
1097                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1098                        for prop in &stmt.properties {
1099                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1100                            let prop_id = self.catalog.get_or_create_property_key(prop);
1101                            let _ = self.catalog.add_required_property(label_id, prop_id);
1102                            let _ = self.catalog.add_constraint_to_type(
1103                                &stmt.label,
1104                                TypeConstraint::NotNull(prop.clone()),
1105                            );
1106                        }
1107                    }
1108                }
1109
1110                wal_log!(
1111                    self,
1112                    WalRecord::CreateConstraint {
1113                        name: constraint_name.clone(),
1114                        label: stmt.label.clone(),
1115                        properties: stmt.properties.clone(),
1116                        kind: kind_str.to_string(),
1117                    }
1118                );
1119                Ok(QueryResult::status(format!(
1120                    "Created {kind_str} constraint '{constraint_name}'"
1121                )))
1122            }
1123            SchemaStatement::DropConstraint { name, if_exists } => {
1124                let _ = if_exists;
1125                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1126                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1127            }
1128            SchemaStatement::CreateGraphType(stmt) => {
1129                use crate::catalog::GraphTypeDefinition;
1130                use grafeo_adapters::query::gql::ast::InlineElementType;
1131
1132                // GG04: LIKE clause copies type from existing graph
1133                let (mut node_types, mut edge_types, open) =
1134                    if let Some(ref like_graph) = stmt.like_graph {
1135                        // Infer types from the graph's bound type, or use its existing types
1136                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1137                            if let Some(existing) = self
1138                                .catalog
1139                                .schema()
1140                                .and_then(|s| s.get_graph_type(&type_name))
1141                            {
1142                                (
1143                                    existing.allowed_node_types.clone(),
1144                                    existing.allowed_edge_types.clone(),
1145                                    existing.open,
1146                                )
1147                            } else {
1148                                (Vec::new(), Vec::new(), true)
1149                            }
1150                        } else {
1151                            // GG22: Infer from graph data (labels used in graph)
1152                            let nt = self.catalog.all_node_type_names();
1153                            let et = self.catalog.all_edge_type_names();
1154                            if nt.is_empty() && et.is_empty() {
1155                                (Vec::new(), Vec::new(), true)
1156                            } else {
1157                                (nt, et, false)
1158                            }
1159                        }
1160                    } else {
1161                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1162                    };
1163
1164                // GG03: Register inline element types and add their names
1165                for inline in &stmt.inline_types {
1166                    match inline {
1167                        InlineElementType::Node {
1168                            name,
1169                            properties,
1170                            key_labels,
1171                            ..
1172                        } => {
1173                            let def = NodeTypeDefinition {
1174                                name: name.clone(),
1175                                properties: properties
1176                                    .iter()
1177                                    .map(|p| TypedProperty {
1178                                        name: p.name.clone(),
1179                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1180                                        nullable: p.nullable,
1181                                        default_value: None,
1182                                    })
1183                                    .collect(),
1184                                constraints: Vec::new(),
1185                                parent_types: key_labels.clone(),
1186                            };
1187                            // Register or replace so inline defs override existing
1188                            self.catalog.register_or_replace_node_type(def);
1189                            #[cfg(feature = "wal")]
1190                            {
1191                                let props_for_wal: Vec<(String, String, bool)> = properties
1192                                    .iter()
1193                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1194                                    .collect();
1195                                self.log_schema_wal(&WalRecord::CreateNodeType {
1196                                    name: name.clone(),
1197                                    properties: props_for_wal,
1198                                    constraints: Vec::new(),
1199                                });
1200                            }
1201                            if !node_types.contains(name) {
1202                                node_types.push(name.clone());
1203                            }
1204                        }
1205                        InlineElementType::Edge {
1206                            name,
1207                            properties,
1208                            source_node_types,
1209                            target_node_types,
1210                            ..
1211                        } => {
1212                            let def = EdgeTypeDefinition {
1213                                name: name.clone(),
1214                                properties: properties
1215                                    .iter()
1216                                    .map(|p| TypedProperty {
1217                                        name: p.name.clone(),
1218                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1219                                        nullable: p.nullable,
1220                                        default_value: None,
1221                                    })
1222                                    .collect(),
1223                                constraints: Vec::new(),
1224                                source_node_types: source_node_types.clone(),
1225                                target_node_types: target_node_types.clone(),
1226                            };
1227                            self.catalog.register_or_replace_edge_type_def(def);
1228                            #[cfg(feature = "wal")]
1229                            {
1230                                let props_for_wal: Vec<(String, String, bool)> = properties
1231                                    .iter()
1232                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1233                                    .collect();
1234                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1235                                    name: name.clone(),
1236                                    properties: props_for_wal,
1237                                    constraints: Vec::new(),
1238                                });
1239                            }
1240                            if !edge_types.contains(name) {
1241                                edge_types.push(name.clone());
1242                            }
1243                        }
1244                    }
1245                }
1246
1247                let def = GraphTypeDefinition {
1248                    name: stmt.name.clone(),
1249                    allowed_node_types: node_types.clone(),
1250                    allowed_edge_types: edge_types.clone(),
1251                    open,
1252                };
1253                let result = if stmt.or_replace {
1254                    // Drop existing first, ignore error if not found
1255                    let _ = self.catalog.drop_graph_type(&stmt.name);
1256                    self.catalog.register_graph_type(def)
1257                } else {
1258                    self.catalog.register_graph_type(def)
1259                };
1260                match result {
1261                    Ok(()) => {
1262                        wal_log!(
1263                            self,
1264                            WalRecord::CreateGraphType {
1265                                name: stmt.name.clone(),
1266                                node_types,
1267                                edge_types,
1268                                open,
1269                            }
1270                        );
1271                        Ok(QueryResult::status(format!(
1272                            "Created graph type '{}'",
1273                            stmt.name
1274                        )))
1275                    }
1276                    Err(e) if stmt.if_not_exists => {
1277                        let _ = e;
1278                        Ok(QueryResult::status("No change"))
1279                    }
1280                    Err(e) => Err(Error::Query(QueryError::new(
1281                        QueryErrorKind::Semantic,
1282                        e.to_string(),
1283                    ))),
1284                }
1285            }
1286            SchemaStatement::DropGraphType { name, if_exists } => {
1287                match self.catalog.drop_graph_type(&name) {
1288                    Ok(()) => {
1289                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1290                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1291                    }
1292                    Err(e) if if_exists => {
1293                        let _ = e;
1294                        Ok(QueryResult::status("No change"))
1295                    }
1296                    Err(e) => Err(Error::Query(QueryError::new(
1297                        QueryErrorKind::Semantic,
1298                        e.to_string(),
1299                    ))),
1300                }
1301            }
1302            SchemaStatement::CreateSchema {
1303                name,
1304                if_not_exists,
1305            } => match self.catalog.register_schema_namespace(name.clone()) {
1306                Ok(()) => {
1307                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1308                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1309                }
1310                Err(e) if if_not_exists => {
1311                    let _ = e;
1312                    Ok(QueryResult::status("No change"))
1313                }
1314                Err(e) => Err(Error::Query(QueryError::new(
1315                    QueryErrorKind::Semantic,
1316                    e.to_string(),
1317                ))),
1318            },
1319            SchemaStatement::DropSchema { name, if_exists } => {
1320                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1321                let prefix = format!("{name}/");
1322                let has_graphs = self
1323                    .store
1324                    .graph_names()
1325                    .iter()
1326                    .any(|g| g.starts_with(&prefix));
1327                if has_graphs {
1328                    return Err(Error::Query(QueryError::new(
1329                        QueryErrorKind::Semantic,
1330                        format!(
1331                            "Schema '{name}' is not empty: drop all graphs in the schema first"
1332                        ),
1333                    )));
1334                }
1335                match self.catalog.drop_schema_namespace(&name) {
1336                    Ok(()) => {
1337                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1338                        // If this session was using the dropped schema, reset it
1339                        let mut current = self.current_schema.lock();
1340                        if current
1341                            .as_deref()
1342                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1343                        {
1344                            *current = None;
1345                        }
1346                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1347                    }
1348                    Err(e) if if_exists => {
1349                        let _ = e;
1350                        Ok(QueryResult::status("No change"))
1351                    }
1352                    Err(e) => Err(Error::Query(QueryError::new(
1353                        QueryErrorKind::Semantic,
1354                        e.to_string(),
1355                    ))),
1356                }
1357            }
1358            SchemaStatement::AlterNodeType(stmt) => {
1359                use grafeo_adapters::query::gql::ast::TypeAlteration;
1360                let mut wal_alts = Vec::new();
1361                for alt in &stmt.alterations {
1362                    match alt {
1363                        TypeAlteration::AddProperty(prop) => {
1364                            let typed = TypedProperty {
1365                                name: prop.name.clone(),
1366                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1367                                nullable: prop.nullable,
1368                                default_value: prop
1369                                    .default_value
1370                                    .as_ref()
1371                                    .map(|s| parse_default_literal(s)),
1372                            };
1373                            self.catalog
1374                                .alter_node_type_add_property(&stmt.name, typed)
1375                                .map_err(|e| {
1376                                    Error::Query(QueryError::new(
1377                                        QueryErrorKind::Semantic,
1378                                        e.to_string(),
1379                                    ))
1380                                })?;
1381                            wal_alts.push((
1382                                "add".to_string(),
1383                                prop.name.clone(),
1384                                prop.data_type.clone(),
1385                                prop.nullable,
1386                            ));
1387                        }
1388                        TypeAlteration::DropProperty(name) => {
1389                            self.catalog
1390                                .alter_node_type_drop_property(&stmt.name, name)
1391                                .map_err(|e| {
1392                                    Error::Query(QueryError::new(
1393                                        QueryErrorKind::Semantic,
1394                                        e.to_string(),
1395                                    ))
1396                                })?;
1397                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1398                        }
1399                    }
1400                }
1401                wal_log!(
1402                    self,
1403                    WalRecord::AlterNodeType {
1404                        name: stmt.name.clone(),
1405                        alterations: wal_alts,
1406                    }
1407                );
1408                Ok(QueryResult::status(format!(
1409                    "Altered node type '{}'",
1410                    stmt.name
1411                )))
1412            }
1413            SchemaStatement::AlterEdgeType(stmt) => {
1414                use grafeo_adapters::query::gql::ast::TypeAlteration;
1415                let mut wal_alts = Vec::new();
1416                for alt in &stmt.alterations {
1417                    match alt {
1418                        TypeAlteration::AddProperty(prop) => {
1419                            let typed = TypedProperty {
1420                                name: prop.name.clone(),
1421                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1422                                nullable: prop.nullable,
1423                                default_value: prop
1424                                    .default_value
1425                                    .as_ref()
1426                                    .map(|s| parse_default_literal(s)),
1427                            };
1428                            self.catalog
1429                                .alter_edge_type_add_property(&stmt.name, typed)
1430                                .map_err(|e| {
1431                                    Error::Query(QueryError::new(
1432                                        QueryErrorKind::Semantic,
1433                                        e.to_string(),
1434                                    ))
1435                                })?;
1436                            wal_alts.push((
1437                                "add".to_string(),
1438                                prop.name.clone(),
1439                                prop.data_type.clone(),
1440                                prop.nullable,
1441                            ));
1442                        }
1443                        TypeAlteration::DropProperty(name) => {
1444                            self.catalog
1445                                .alter_edge_type_drop_property(&stmt.name, name)
1446                                .map_err(|e| {
1447                                    Error::Query(QueryError::new(
1448                                        QueryErrorKind::Semantic,
1449                                        e.to_string(),
1450                                    ))
1451                                })?;
1452                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1453                        }
1454                    }
1455                }
1456                wal_log!(
1457                    self,
1458                    WalRecord::AlterEdgeType {
1459                        name: stmt.name.clone(),
1460                        alterations: wal_alts,
1461                    }
1462                );
1463                Ok(QueryResult::status(format!(
1464                    "Altered edge type '{}'",
1465                    stmt.name
1466                )))
1467            }
1468            SchemaStatement::AlterGraphType(stmt) => {
1469                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1470                let mut wal_alts = Vec::new();
1471                for alt in &stmt.alterations {
1472                    match alt {
1473                        GraphTypeAlteration::AddNodeType(name) => {
1474                            self.catalog
1475                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1476                                .map_err(|e| {
1477                                    Error::Query(QueryError::new(
1478                                        QueryErrorKind::Semantic,
1479                                        e.to_string(),
1480                                    ))
1481                                })?;
1482                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1483                        }
1484                        GraphTypeAlteration::DropNodeType(name) => {
1485                            self.catalog
1486                                .alter_graph_type_drop_node_type(&stmt.name, name)
1487                                .map_err(|e| {
1488                                    Error::Query(QueryError::new(
1489                                        QueryErrorKind::Semantic,
1490                                        e.to_string(),
1491                                    ))
1492                                })?;
1493                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1494                        }
1495                        GraphTypeAlteration::AddEdgeType(name) => {
1496                            self.catalog
1497                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1498                                .map_err(|e| {
1499                                    Error::Query(QueryError::new(
1500                                        QueryErrorKind::Semantic,
1501                                        e.to_string(),
1502                                    ))
1503                                })?;
1504                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1505                        }
1506                        GraphTypeAlteration::DropEdgeType(name) => {
1507                            self.catalog
1508                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1509                                .map_err(|e| {
1510                                    Error::Query(QueryError::new(
1511                                        QueryErrorKind::Semantic,
1512                                        e.to_string(),
1513                                    ))
1514                                })?;
1515                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1516                        }
1517                    }
1518                }
1519                wal_log!(
1520                    self,
1521                    WalRecord::AlterGraphType {
1522                        name: stmt.name.clone(),
1523                        alterations: wal_alts,
1524                    }
1525                );
1526                Ok(QueryResult::status(format!(
1527                    "Altered graph type '{}'",
1528                    stmt.name
1529                )))
1530            }
1531            SchemaStatement::CreateProcedure(stmt) => {
1532                use crate::catalog::ProcedureDefinition;
1533
1534                let def = ProcedureDefinition {
1535                    name: stmt.name.clone(),
1536                    params: stmt
1537                        .params
1538                        .iter()
1539                        .map(|p| (p.name.clone(), p.param_type.clone()))
1540                        .collect(),
1541                    returns: stmt
1542                        .returns
1543                        .iter()
1544                        .map(|r| (r.name.clone(), r.return_type.clone()))
1545                        .collect(),
1546                    body: stmt.body.clone(),
1547                };
1548
1549                if stmt.or_replace {
1550                    self.catalog.replace_procedure(def).map_err(|e| {
1551                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1552                    })?;
1553                } else {
1554                    match self.catalog.register_procedure(def) {
1555                        Ok(()) => {}
1556                        Err(_) if stmt.if_not_exists => {
1557                            return Ok(QueryResult::empty());
1558                        }
1559                        Err(e) => {
1560                            return Err(Error::Query(QueryError::new(
1561                                QueryErrorKind::Semantic,
1562                                e.to_string(),
1563                            )));
1564                        }
1565                    }
1566                }
1567
1568                wal_log!(
1569                    self,
1570                    WalRecord::CreateProcedure {
1571                        name: stmt.name.clone(),
1572                        params: stmt
1573                            .params
1574                            .iter()
1575                            .map(|p| (p.name.clone(), p.param_type.clone()))
1576                            .collect(),
1577                        returns: stmt
1578                            .returns
1579                            .iter()
1580                            .map(|r| (r.name.clone(), r.return_type.clone()))
1581                            .collect(),
1582                        body: stmt.body,
1583                    }
1584                );
1585                Ok(QueryResult::status(format!(
1586                    "Created procedure '{}'",
1587                    stmt.name
1588                )))
1589            }
1590            SchemaStatement::DropProcedure { name, if_exists } => {
1591                match self.catalog.drop_procedure(&name) {
1592                    Ok(()) => {}
1593                    Err(_) if if_exists => {
1594                        return Ok(QueryResult::empty());
1595                    }
1596                    Err(e) => {
1597                        return Err(Error::Query(QueryError::new(
1598                            QueryErrorKind::Semantic,
1599                            e.to_string(),
1600                        )));
1601                    }
1602                }
1603                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1604                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1605            }
1606            SchemaStatement::ShowIndexes => {
1607                return self.execute_show_indexes();
1608            }
1609            SchemaStatement::ShowConstraints => {
1610                return self.execute_show_constraints();
1611            }
1612            SchemaStatement::ShowNodeTypes => {
1613                return self.execute_show_node_types();
1614            }
1615            SchemaStatement::ShowEdgeTypes => {
1616                return self.execute_show_edge_types();
1617            }
1618            SchemaStatement::ShowGraphTypes => {
1619                return self.execute_show_graph_types();
1620            }
1621            SchemaStatement::ShowGraphType(name) => {
1622                return self.execute_show_graph_type(&name);
1623            }
1624            SchemaStatement::ShowCurrentGraphType => {
1625                return self.execute_show_current_graph_type();
1626            }
1627            SchemaStatement::ShowGraphs => {
1628                return self.execute_show_graphs();
1629            }
1630            SchemaStatement::ShowSchemas => {
1631                return self.execute_show_schemas();
1632            }
1633        };
1634
1635        // Invalidate all cached query plans after any successful DDL change.
1636        // DDL is rare, so clearing the entire cache is cheap and correct.
1637        if result.is_ok() {
1638            self.query_cache.clear();
1639        }
1640
1641        result
1642    }
1643
1644    /// Creates a vector index on the store by scanning existing nodes.
1645    #[cfg(all(feature = "gql", feature = "vector-index"))]
1646    fn create_vector_index_on_store(
1647        store: &LpgStore,
1648        label: &str,
1649        property: &str,
1650        dimensions: Option<usize>,
1651        metric: Option<&str>,
1652    ) -> Result<()> {
1653        use grafeo_common::types::{PropertyKey, Value};
1654        use grafeo_common::utils::error::Error;
1655        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1656
1657        let metric = match metric {
1658            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1659                Error::Internal(format!(
1660                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1661                ))
1662            })?,
1663            None => DistanceMetric::Cosine,
1664        };
1665
1666        let prop_key = PropertyKey::new(property);
1667        let mut found_dims: Option<usize> = dimensions;
1668        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1669
1670        for node in store.nodes_with_label(label) {
1671            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1672                if let Some(expected) = found_dims {
1673                    if v.len() != expected {
1674                        return Err(Error::Internal(format!(
1675                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1676                            v.len(),
1677                            node.id.0
1678                        )));
1679                    }
1680                } else {
1681                    found_dims = Some(v.len());
1682                }
1683                vectors.push((node.id, v.to_vec()));
1684            }
1685        }
1686
1687        let Some(dims) = found_dims else {
1688            return Err(Error::Internal(format!(
1689                "No vector properties found on :{label}({property}) and no dimensions specified"
1690            )));
1691        };
1692
1693        let config = HnswConfig::new(dims, metric);
1694        let index = HnswIndex::with_capacity(config, vectors.len());
1695        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1696        for (node_id, vec) in &vectors {
1697            index.insert(*node_id, vec, &accessor);
1698        }
1699
1700        store.add_vector_index(label, property, Arc::new(index));
1701        Ok(())
1702    }
1703
1704    /// Stub for when vector-index feature is not enabled.
1705    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1706    fn create_vector_index_on_store(
1707        _store: &LpgStore,
1708        _label: &str,
1709        _property: &str,
1710        _dimensions: Option<usize>,
1711        _metric: Option<&str>,
1712    ) -> Result<()> {
1713        Err(grafeo_common::utils::error::Error::Internal(
1714            "Vector index support requires the 'vector-index' feature".to_string(),
1715        ))
1716    }
1717
1718    /// Creates a text index on the store by scanning existing nodes.
1719    #[cfg(all(feature = "gql", feature = "text-index"))]
1720    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1721        use grafeo_common::types::{PropertyKey, Value};
1722        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1723
1724        let mut index = InvertedIndex::new(BM25Config::default());
1725        let prop_key = PropertyKey::new(property);
1726
1727        let nodes = store.nodes_by_label(label);
1728        for node_id in nodes {
1729            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1730                index.insert(node_id, text.as_str());
1731            }
1732        }
1733
1734        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1735        Ok(())
1736    }
1737
1738    /// Stub for when text-index feature is not enabled.
1739    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1740    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1741        Err(grafeo_common::utils::error::Error::Internal(
1742            "Text index support requires the 'text-index' feature".to_string(),
1743        ))
1744    }
1745
1746    /// Returns a table of all indexes from the catalog.
1747    fn execute_show_indexes(&self) -> Result<QueryResult> {
1748        let indexes = self.catalog.all_indexes();
1749        let columns = vec![
1750            "name".to_string(),
1751            "type".to_string(),
1752            "label".to_string(),
1753            "property".to_string(),
1754        ];
1755        let rows: Vec<Vec<Value>> = indexes
1756            .into_iter()
1757            .map(|def| {
1758                let label_name = self
1759                    .catalog
1760                    .get_label_name(def.label)
1761                    .unwrap_or_else(|| "?".into());
1762                let prop_name = self
1763                    .catalog
1764                    .get_property_key_name(def.property_key)
1765                    .unwrap_or_else(|| "?".into());
1766                vec![
1767                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1768                    Value::from(format!("{:?}", def.index_type)),
1769                    Value::from(&*label_name),
1770                    Value::from(&*prop_name),
1771                ]
1772            })
1773            .collect();
1774        Ok(QueryResult {
1775            columns,
1776            column_types: Vec::new(),
1777            rows,
1778            ..QueryResult::empty()
1779        })
1780    }
1781
1782    /// Returns a table of all constraints (currently metadata-only).
1783    fn execute_show_constraints(&self) -> Result<QueryResult> {
1784        // Constraints are tracked in WAL but not yet in a queryable catalog.
1785        // Return an empty table with the expected schema.
1786        Ok(QueryResult {
1787            columns: vec![
1788                "name".to_string(),
1789                "type".to_string(),
1790                "label".to_string(),
1791                "properties".to_string(),
1792            ],
1793            column_types: Vec::new(),
1794            rows: Vec::new(),
1795            ..QueryResult::empty()
1796        })
1797    }
1798
1799    /// Returns a table of all registered node types.
1800    fn execute_show_node_types(&self) -> Result<QueryResult> {
1801        let columns = vec![
1802            "name".to_string(),
1803            "properties".to_string(),
1804            "constraints".to_string(),
1805            "parents".to_string(),
1806        ];
1807        let type_names = self.catalog.all_node_type_names();
1808        let rows: Vec<Vec<Value>> = type_names
1809            .into_iter()
1810            .filter_map(|name| {
1811                let def = self.catalog.get_node_type(&name)?;
1812                let props: Vec<String> = def
1813                    .properties
1814                    .iter()
1815                    .map(|p| {
1816                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1817                        format!("{} {}{}", p.name, p.data_type, nullable)
1818                    })
1819                    .collect();
1820                let constraints: Vec<String> =
1821                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1822                let parents = def.parent_types.join(", ");
1823                Some(vec![
1824                    Value::from(name),
1825                    Value::from(props.join(", ")),
1826                    Value::from(constraints.join(", ")),
1827                    Value::from(parents),
1828                ])
1829            })
1830            .collect();
1831        Ok(QueryResult {
1832            columns,
1833            column_types: Vec::new(),
1834            rows,
1835            ..QueryResult::empty()
1836        })
1837    }
1838
1839    /// Returns a table of all registered edge types.
1840    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1841        let columns = vec![
1842            "name".to_string(),
1843            "properties".to_string(),
1844            "source_types".to_string(),
1845            "target_types".to_string(),
1846        ];
1847        let type_names = self.catalog.all_edge_type_names();
1848        let rows: Vec<Vec<Value>> = type_names
1849            .into_iter()
1850            .filter_map(|name| {
1851                let def = self.catalog.get_edge_type_def(&name)?;
1852                let props: Vec<String> = def
1853                    .properties
1854                    .iter()
1855                    .map(|p| {
1856                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1857                        format!("{} {}{}", p.name, p.data_type, nullable)
1858                    })
1859                    .collect();
1860                let src = def.source_node_types.join(", ");
1861                let tgt = def.target_node_types.join(", ");
1862                Some(vec![
1863                    Value::from(name),
1864                    Value::from(props.join(", ")),
1865                    Value::from(src),
1866                    Value::from(tgt),
1867                ])
1868            })
1869            .collect();
1870        Ok(QueryResult {
1871            columns,
1872            column_types: Vec::new(),
1873            rows,
1874            ..QueryResult::empty()
1875        })
1876    }
1877
1878    /// Returns a table of all registered graph types.
1879    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1880        let columns = vec![
1881            "name".to_string(),
1882            "open".to_string(),
1883            "node_types".to_string(),
1884            "edge_types".to_string(),
1885        ];
1886        let type_names = self.catalog.all_graph_type_names();
1887        let rows: Vec<Vec<Value>> = type_names
1888            .into_iter()
1889            .filter_map(|name| {
1890                let def = self.catalog.get_graph_type_def(&name)?;
1891                Some(vec![
1892                    Value::from(name),
1893                    Value::from(def.open),
1894                    Value::from(def.allowed_node_types.join(", ")),
1895                    Value::from(def.allowed_edge_types.join(", ")),
1896                ])
1897            })
1898            .collect();
1899        Ok(QueryResult {
1900            columns,
1901            column_types: Vec::new(),
1902            rows,
1903            ..QueryResult::empty()
1904        })
1905    }
1906
1907    /// Returns the list of named graphs visible in the current schema context.
1908    ///
1909    /// When a session schema is set, only graphs belonging to that schema are
1910    /// shown (their compound prefix is stripped). When no schema is set, graphs
1911    /// without a schema prefix are shown (the default schema).
1912    fn execute_show_graphs(&self) -> Result<QueryResult> {
1913        let schema = self.current_schema.lock().clone();
1914        let all_names = self.store.graph_names();
1915
1916        let mut names: Vec<String> = match &schema {
1917            Some(s) => {
1918                let prefix = format!("{s}/");
1919                all_names
1920                    .into_iter()
1921                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1922                    .collect()
1923            }
1924            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1925        };
1926        names.sort();
1927
1928        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1929        Ok(QueryResult {
1930            columns: vec!["name".to_string()],
1931            column_types: Vec::new(),
1932            rows,
1933            ..QueryResult::empty()
1934        })
1935    }
1936
1937    /// Returns the list of all schema namespaces.
1938    fn execute_show_schemas(&self) -> Result<QueryResult> {
1939        let mut names = self.catalog.schema_names();
1940        names.sort();
1941        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1942        Ok(QueryResult {
1943            columns: vec!["name".to_string()],
1944            column_types: Vec::new(),
1945            rows,
1946            ..QueryResult::empty()
1947        })
1948    }
1949
1950    /// Returns detailed info for a specific graph type.
1951    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1952        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1953
1954        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1955            Error::Query(QueryError::new(
1956                QueryErrorKind::Semantic,
1957                format!("Graph type '{name}' not found"),
1958            ))
1959        })?;
1960
1961        let columns = vec![
1962            "name".to_string(),
1963            "open".to_string(),
1964            "node_types".to_string(),
1965            "edge_types".to_string(),
1966        ];
1967        let rows = vec![vec![
1968            Value::from(def.name),
1969            Value::from(def.open),
1970            Value::from(def.allowed_node_types.join(", ")),
1971            Value::from(def.allowed_edge_types.join(", ")),
1972        ]];
1973        Ok(QueryResult {
1974            columns,
1975            column_types: Vec::new(),
1976            rows,
1977            ..QueryResult::empty()
1978        })
1979    }
1980
1981    /// Returns the graph type bound to the current graph.
1982    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1983        let graph_name = self
1984            .current_graph()
1985            .unwrap_or_else(|| "default".to_string());
1986        let columns = vec![
1987            "graph".to_string(),
1988            "graph_type".to_string(),
1989            "open".to_string(),
1990            "node_types".to_string(),
1991            "edge_types".to_string(),
1992        ];
1993
1994        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1995            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1996        {
1997            let rows = vec![vec![
1998                Value::from(graph_name),
1999                Value::from(type_name),
2000                Value::from(def.open),
2001                Value::from(def.allowed_node_types.join(", ")),
2002                Value::from(def.allowed_edge_types.join(", ")),
2003            ]];
2004            return Ok(QueryResult {
2005                columns,
2006                column_types: Vec::new(),
2007                rows,
2008                ..QueryResult::empty()
2009            });
2010        }
2011
2012        // No graph type binding found
2013        Ok(QueryResult {
2014            columns,
2015            column_types: Vec::new(),
2016            rows: vec![vec![
2017                Value::from(graph_name),
2018                Value::Null,
2019                Value::Null,
2020                Value::Null,
2021                Value::Null,
2022            ]],
2023            ..QueryResult::empty()
2024        })
2025    }
2026
2027    /// Executes a GQL query.
2028    ///
2029    /// # Errors
2030    ///
2031    /// Returns an error if the query fails to parse or execute.
2032    ///
2033    /// # Examples
2034    ///
2035    /// ```no_run
2036    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2037    /// use grafeo_engine::GrafeoDB;
2038    ///
2039    /// let db = GrafeoDB::new_in_memory();
2040    /// let session = db.session();
2041    ///
2042    /// // Create a node
2043    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2044    ///
2045    /// // Query nodes
2046    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2047    /// for row in &result.rows {
2048    ///     println!("{:?}", row);
2049    /// }
2050    /// # Ok(())
2051    /// # }
2052    /// ```
2053    #[cfg(feature = "gql")]
2054    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2055        self.require_lpg("GQL")?;
2056
2057        use crate::query::{
2058            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2059            processor::QueryLanguage, translators::gql,
2060        };
2061
2062        let _span = tracing::info_span!(
2063            "grafeo::session::execute",
2064            language = "gql",
2065            query_len = query.len(),
2066        )
2067        .entered();
2068
2069        #[cfg(not(target_arch = "wasm32"))]
2070        let start_time = std::time::Instant::now();
2071
2072        // Parse and translate, checking for session/schema commands first
2073        let translation = gql::translate_full(query)?;
2074        let logical_plan = match translation {
2075            gql::GqlTranslationResult::SessionCommand(cmd) => {
2076                return self.execute_session_command(cmd);
2077            }
2078            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2079                // All DDL is a write operation
2080                if *self.read_only_tx.lock() {
2081                    return Err(grafeo_common::utils::error::Error::Transaction(
2082                        grafeo_common::utils::error::TransactionError::ReadOnly,
2083                    ));
2084                }
2085                return self.execute_schema_command(cmd);
2086            }
2087            gql::GqlTranslationResult::Plan(plan) => {
2088                // Block mutations in read-only transactions
2089                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2090                    return Err(grafeo_common::utils::error::Error::Transaction(
2091                        grafeo_common::utils::error::TransactionError::ReadOnly,
2092                    ));
2093                }
2094                plan
2095            }
2096        };
2097
2098        // Create cache key for this query
2099        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2100
2101        // Try to get cached optimized plan, or use the plan we just translated
2102        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2103            cached_plan
2104        } else {
2105            // Semantic validation
2106            let mut binder = Binder::new();
2107            let _binding_context = binder.bind(&logical_plan)?;
2108
2109            // Optimize the plan
2110            let active = self.active_store();
2111            let optimizer = Optimizer::from_graph_store(&*active);
2112            let plan = optimizer.optimize(logical_plan)?;
2113
2114            // Cache the optimized plan for future use
2115            self.query_cache.put_optimized(cache_key, plan.clone());
2116
2117            plan
2118        };
2119
2120        // Resolve the active store for query execution
2121        let active = self.active_store();
2122
2123        // EXPLAIN: annotate pushdown hints and return the plan tree
2124        if optimized_plan.explain {
2125            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2126            let mut plan = optimized_plan;
2127            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2128            return Ok(explain_result(&plan));
2129        }
2130
2131        // PROFILE: execute with per-operator instrumentation
2132        if optimized_plan.profile {
2133            let has_mutations = optimized_plan.root.has_mutations();
2134            return self.with_auto_commit(has_mutations, || {
2135                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2136                let planner = self.create_planner_for_store(
2137                    Arc::clone(&active),
2138                    viewing_epoch,
2139                    transaction_id,
2140                );
2141                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2142
2143                let executor = Executor::with_columns(physical_plan.columns.clone())
2144                    .with_deadline(self.query_deadline());
2145                let _result = executor.execute(physical_plan.operator.as_mut())?;
2146
2147                let total_time_ms;
2148                #[cfg(not(target_arch = "wasm32"))]
2149                {
2150                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2151                }
2152                #[cfg(target_arch = "wasm32")]
2153                {
2154                    total_time_ms = 0.0;
2155                }
2156
2157                let profile_tree = crate::query::profile::build_profile_tree(
2158                    &optimized_plan.root,
2159                    &mut entries.into_iter(),
2160                );
2161                Ok(crate::query::profile::profile_result(
2162                    &profile_tree,
2163                    total_time_ms,
2164                ))
2165            });
2166        }
2167
2168        let has_mutations = optimized_plan.root.has_mutations();
2169
2170        let result = self.with_auto_commit(has_mutations, || {
2171            // Get transaction context for MVCC visibility
2172            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2173
2174            // Convert to physical plan with transaction context
2175            // (Physical planning cannot be cached as it depends on transaction state)
2176            // Safe to use read-only fast path when: this query has no mutations AND
2177            // there is no active transaction that may have prior uncommitted writes.
2178            let has_active_tx = self.current_transaction.lock().is_some();
2179            let read_only = !has_mutations && !has_active_tx;
2180            let planner = self.create_planner_for_store_with_read_only(
2181                Arc::clone(&active),
2182                viewing_epoch,
2183                transaction_id,
2184                read_only,
2185            );
2186            let mut physical_plan = planner.plan(&optimized_plan)?;
2187
2188            // Execute the plan
2189            let executor = Executor::with_columns(physical_plan.columns.clone())
2190                .with_deadline(self.query_deadline());
2191            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2192
2193            // Add execution metrics
2194            let rows_scanned = result.rows.len() as u64;
2195            #[cfg(not(target_arch = "wasm32"))]
2196            {
2197                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2198                result.execution_time_ms = Some(elapsed_ms);
2199            }
2200            result.rows_scanned = Some(rows_scanned);
2201
2202            Ok(result)
2203        });
2204
2205        // Record metrics for this query execution.
2206        #[cfg(feature = "metrics")]
2207        {
2208            #[cfg(not(target_arch = "wasm32"))]
2209            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2210            #[cfg(target_arch = "wasm32")]
2211            let elapsed_ms = None;
2212            self.record_query_metrics("gql", elapsed_ms, &result);
2213        }
2214
2215        result
2216    }
2217
2218    /// Executes a GQL query with visibility at the specified epoch.
2219    ///
2220    /// This enables time-travel queries: the query sees the database
2221    /// as it existed at the given epoch.
2222    ///
2223    /// # Errors
2224    ///
2225    /// Returns an error if parsing or execution fails.
2226    #[cfg(feature = "gql")]
2227    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2228        let previous = self.viewing_epoch_override.lock().replace(epoch);
2229        let result = self.execute(query);
2230        *self.viewing_epoch_override.lock() = previous;
2231        result
2232    }
2233
2234    /// Executes a GQL query with parameters.
2235    ///
2236    /// # Errors
2237    ///
2238    /// Returns an error if the query fails to parse or execute.
2239    #[cfg(feature = "gql")]
2240    pub fn execute_with_params(
2241        &self,
2242        query: &str,
2243        params: std::collections::HashMap<String, Value>,
2244    ) -> Result<QueryResult> {
2245        self.require_lpg("GQL")?;
2246
2247        use crate::query::processor::{QueryLanguage, QueryProcessor};
2248
2249        let has_mutations = Self::query_looks_like_mutation(query);
2250        let active = self.active_store();
2251
2252        self.with_auto_commit(has_mutations, || {
2253            // Get transaction context for MVCC visibility
2254            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2255
2256            // Create processor with transaction context
2257            let processor = QueryProcessor::for_graph_store_with_transaction(
2258                Arc::clone(&active),
2259                Arc::clone(&self.transaction_manager),
2260            )?;
2261
2262            // Apply transaction context if in a transaction
2263            let processor = if let Some(transaction_id) = transaction_id {
2264                processor.with_transaction_context(viewing_epoch, transaction_id)
2265            } else {
2266                processor
2267            };
2268
2269            processor.process(query, QueryLanguage::Gql, Some(&params))
2270        })
2271    }
2272
2273    /// Executes a GQL query with parameters.
2274    ///
2275    /// # Errors
2276    ///
2277    /// Returns an error if no query language is enabled.
2278    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2279    pub fn execute_with_params(
2280        &self,
2281        _query: &str,
2282        _params: std::collections::HashMap<String, Value>,
2283    ) -> Result<QueryResult> {
2284        Err(grafeo_common::utils::error::Error::Internal(
2285            "No query language enabled".to_string(),
2286        ))
2287    }
2288
2289    /// Executes a GQL query.
2290    ///
2291    /// # Errors
2292    ///
2293    /// Returns an error if no query language is enabled.
2294    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2295    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2296        Err(grafeo_common::utils::error::Error::Internal(
2297            "No query language enabled".to_string(),
2298        ))
2299    }
2300
2301    /// Executes a Cypher query.
2302    ///
2303    /// # Errors
2304    ///
2305    /// Returns an error if the query fails to parse or execute.
2306    #[cfg(feature = "cypher")]
2307    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2308        use crate::query::{
2309            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2310            processor::QueryLanguage, translators::cypher,
2311        };
2312        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2313
2314        // Handle schema DDL and SHOW commands before the normal query path
2315        let translation = cypher::translate_full(query)?;
2316        match translation {
2317            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2318                if *self.read_only_tx.lock() {
2319                    return Err(GrafeoError::Query(QueryError::new(
2320                        QueryErrorKind::Semantic,
2321                        "Cannot execute schema DDL in a read-only transaction",
2322                    )));
2323                }
2324                return self.execute_schema_command(cmd);
2325            }
2326            cypher::CypherTranslationResult::ShowIndexes => {
2327                return self.execute_show_indexes();
2328            }
2329            cypher::CypherTranslationResult::ShowConstraints => {
2330                return self.execute_show_constraints();
2331            }
2332            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2333                return self.execute_show_current_graph_type();
2334            }
2335            cypher::CypherTranslationResult::Plan(_) => {
2336                // Fall through to normal execution below
2337            }
2338        }
2339
2340        #[cfg(not(target_arch = "wasm32"))]
2341        let start_time = std::time::Instant::now();
2342
2343        // Create cache key for this query
2344        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2345
2346        // Try to get cached optimized plan
2347        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2348            cached_plan
2349        } else {
2350            // Parse and translate the query to a logical plan
2351            let logical_plan = cypher::translate(query)?;
2352
2353            // Semantic validation
2354            let mut binder = Binder::new();
2355            let _binding_context = binder.bind(&logical_plan)?;
2356
2357            // Optimize the plan
2358            let active = self.active_store();
2359            let optimizer = Optimizer::from_graph_store(&*active);
2360            let plan = optimizer.optimize(logical_plan)?;
2361
2362            // Cache the optimized plan
2363            self.query_cache.put_optimized(cache_key, plan.clone());
2364
2365            plan
2366        };
2367
2368        // Resolve the active store for query execution
2369        let active = self.active_store();
2370
2371        // EXPLAIN
2372        if optimized_plan.explain {
2373            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2374            let mut plan = optimized_plan;
2375            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2376            return Ok(explain_result(&plan));
2377        }
2378
2379        // PROFILE
2380        if optimized_plan.profile {
2381            let has_mutations = optimized_plan.root.has_mutations();
2382            return self.with_auto_commit(has_mutations, || {
2383                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2384                let planner = self.create_planner_for_store(
2385                    Arc::clone(&active),
2386                    viewing_epoch,
2387                    transaction_id,
2388                );
2389                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2390
2391                let executor = Executor::with_columns(physical_plan.columns.clone())
2392                    .with_deadline(self.query_deadline());
2393                let _result = executor.execute(physical_plan.operator.as_mut())?;
2394
2395                let total_time_ms;
2396                #[cfg(not(target_arch = "wasm32"))]
2397                {
2398                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2399                }
2400                #[cfg(target_arch = "wasm32")]
2401                {
2402                    total_time_ms = 0.0;
2403                }
2404
2405                let profile_tree = crate::query::profile::build_profile_tree(
2406                    &optimized_plan.root,
2407                    &mut entries.into_iter(),
2408                );
2409                Ok(crate::query::profile::profile_result(
2410                    &profile_tree,
2411                    total_time_ms,
2412                ))
2413            });
2414        }
2415
2416        let has_mutations = optimized_plan.root.has_mutations();
2417
2418        let result = self.with_auto_commit(has_mutations, || {
2419            // Get transaction context for MVCC visibility
2420            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2421
2422            // Convert to physical plan with transaction context
2423            let planner =
2424                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2425            let mut physical_plan = planner.plan(&optimized_plan)?;
2426
2427            // Execute the plan
2428            let executor = Executor::with_columns(physical_plan.columns.clone())
2429                .with_deadline(self.query_deadline());
2430            executor.execute(physical_plan.operator.as_mut())
2431        });
2432
2433        #[cfg(feature = "metrics")]
2434        {
2435            #[cfg(not(target_arch = "wasm32"))]
2436            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2437            #[cfg(target_arch = "wasm32")]
2438            let elapsed_ms = None;
2439            self.record_query_metrics("cypher", elapsed_ms, &result);
2440        }
2441
2442        result
2443    }
2444
2445    /// Executes a Gremlin query.
2446    ///
2447    /// # Errors
2448    ///
2449    /// Returns an error if the query fails to parse or execute.
2450    ///
2451    /// # Examples
2452    ///
2453    /// ```no_run
2454    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2455    /// use grafeo_engine::GrafeoDB;
2456    ///
2457    /// let db = GrafeoDB::new_in_memory();
2458    /// let session = db.session();
2459    ///
2460    /// // Create some nodes first
2461    /// session.create_node(&["Person"]);
2462    ///
2463    /// // Query using Gremlin
2464    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2465    /// # Ok(())
2466    /// # }
2467    /// ```
2468    #[cfg(feature = "gremlin")]
2469    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2470        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2471
2472        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2473        let start_time = Instant::now();
2474
2475        // Parse and translate the query to a logical plan
2476        let logical_plan = gremlin::translate(query)?;
2477
2478        // Semantic validation
2479        let mut binder = Binder::new();
2480        let _binding_context = binder.bind(&logical_plan)?;
2481
2482        // Optimize the plan
2483        let active = self.active_store();
2484        let optimizer = Optimizer::from_graph_store(&*active);
2485        let optimized_plan = optimizer.optimize(logical_plan)?;
2486
2487        let has_mutations = optimized_plan.root.has_mutations();
2488
2489        let result = self.with_auto_commit(has_mutations, || {
2490            // Get transaction context for MVCC visibility
2491            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2492
2493            // Convert to physical plan with transaction context
2494            let planner =
2495                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2496            let mut physical_plan = planner.plan(&optimized_plan)?;
2497
2498            // Execute the plan
2499            let executor = Executor::with_columns(physical_plan.columns.clone())
2500                .with_deadline(self.query_deadline());
2501            executor.execute(physical_plan.operator.as_mut())
2502        });
2503
2504        #[cfg(feature = "metrics")]
2505        {
2506            #[cfg(not(target_arch = "wasm32"))]
2507            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2508            #[cfg(target_arch = "wasm32")]
2509            let elapsed_ms = None;
2510            self.record_query_metrics("gremlin", elapsed_ms, &result);
2511        }
2512
2513        result
2514    }
2515
2516    /// Executes a Gremlin query with parameters.
2517    ///
2518    /// # Errors
2519    ///
2520    /// Returns an error if the query fails to parse or execute.
2521    #[cfg(feature = "gremlin")]
2522    pub fn execute_gremlin_with_params(
2523        &self,
2524        query: &str,
2525        params: std::collections::HashMap<String, Value>,
2526    ) -> Result<QueryResult> {
2527        use crate::query::processor::{QueryLanguage, QueryProcessor};
2528
2529        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2530        let start_time = Instant::now();
2531
2532        let has_mutations = Self::query_looks_like_mutation(query);
2533        let active = self.active_store();
2534
2535        let result = self.with_auto_commit(has_mutations, || {
2536            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2537            let processor = QueryProcessor::for_graph_store_with_transaction(
2538                Arc::clone(&active),
2539                Arc::clone(&self.transaction_manager),
2540            )?;
2541            let processor = if let Some(transaction_id) = transaction_id {
2542                processor.with_transaction_context(viewing_epoch, transaction_id)
2543            } else {
2544                processor
2545            };
2546            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2547        });
2548
2549        #[cfg(feature = "metrics")]
2550        {
2551            #[cfg(not(target_arch = "wasm32"))]
2552            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2553            #[cfg(target_arch = "wasm32")]
2554            let elapsed_ms = None;
2555            self.record_query_metrics("gremlin", elapsed_ms, &result);
2556        }
2557
2558        result
2559    }
2560
2561    /// Executes a GraphQL query against the LPG store.
2562    ///
2563    /// # Errors
2564    ///
2565    /// Returns an error if the query fails to parse or execute.
2566    ///
2567    /// # Examples
2568    ///
2569    /// ```no_run
2570    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2571    /// use grafeo_engine::GrafeoDB;
2572    ///
2573    /// let db = GrafeoDB::new_in_memory();
2574    /// let session = db.session();
2575    ///
2576    /// // Create some nodes first
2577    /// session.create_node(&["User"]);
2578    ///
2579    /// // Query using GraphQL
2580    /// let result = session.execute_graphql("query { user { id name } }")?;
2581    /// # Ok(())
2582    /// # }
2583    /// ```
2584    #[cfg(feature = "graphql")]
2585    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2586        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2587
2588        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2589        let start_time = Instant::now();
2590
2591        let logical_plan = graphql::translate(query)?;
2592        let mut binder = Binder::new();
2593        let _binding_context = binder.bind(&logical_plan)?;
2594
2595        let active = self.active_store();
2596        let optimizer = Optimizer::from_graph_store(&*active);
2597        let optimized_plan = optimizer.optimize(logical_plan)?;
2598        let has_mutations = optimized_plan.root.has_mutations();
2599
2600        let result = self.with_auto_commit(has_mutations, || {
2601            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2602            let planner =
2603                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2604            let mut physical_plan = planner.plan(&optimized_plan)?;
2605            let executor = Executor::with_columns(physical_plan.columns.clone())
2606                .with_deadline(self.query_deadline());
2607            executor.execute(physical_plan.operator.as_mut())
2608        });
2609
2610        #[cfg(feature = "metrics")]
2611        {
2612            #[cfg(not(target_arch = "wasm32"))]
2613            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2614            #[cfg(target_arch = "wasm32")]
2615            let elapsed_ms = None;
2616            self.record_query_metrics("graphql", elapsed_ms, &result);
2617        }
2618
2619        result
2620    }
2621
2622    /// Executes a GraphQL query with parameters.
2623    ///
2624    /// # Errors
2625    ///
2626    /// Returns an error if the query fails to parse or execute.
2627    #[cfg(feature = "graphql")]
2628    pub fn execute_graphql_with_params(
2629        &self,
2630        query: &str,
2631        params: std::collections::HashMap<String, Value>,
2632    ) -> Result<QueryResult> {
2633        use crate::query::processor::{QueryLanguage, QueryProcessor};
2634
2635        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2636        let start_time = Instant::now();
2637
2638        let has_mutations = Self::query_looks_like_mutation(query);
2639        let active = self.active_store();
2640
2641        let result = self.with_auto_commit(has_mutations, || {
2642            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2643            let processor = QueryProcessor::for_graph_store_with_transaction(
2644                Arc::clone(&active),
2645                Arc::clone(&self.transaction_manager),
2646            )?;
2647            let processor = if let Some(transaction_id) = transaction_id {
2648                processor.with_transaction_context(viewing_epoch, transaction_id)
2649            } else {
2650                processor
2651            };
2652            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2653        });
2654
2655        #[cfg(feature = "metrics")]
2656        {
2657            #[cfg(not(target_arch = "wasm32"))]
2658            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2659            #[cfg(target_arch = "wasm32")]
2660            let elapsed_ms = None;
2661            self.record_query_metrics("graphql", elapsed_ms, &result);
2662        }
2663
2664        result
2665    }
2666
2667    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2668    ///
2669    /// # Errors
2670    ///
2671    /// Returns an error if the query fails to parse or execute.
2672    ///
2673    /// # Examples
2674    ///
2675    /// ```no_run
2676    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2677    /// use grafeo_engine::GrafeoDB;
2678    ///
2679    /// let db = GrafeoDB::new_in_memory();
2680    /// let session = db.session();
2681    ///
2682    /// let result = session.execute_sql(
2683    ///     "SELECT * FROM GRAPH_TABLE (
2684    ///         MATCH (n:Person)
2685    ///         COLUMNS (n.name AS name)
2686    ///     )"
2687    /// )?;
2688    /// # Ok(())
2689    /// # }
2690    /// ```
2691    #[cfg(feature = "sql-pgq")]
2692    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2693        use crate::query::{
2694            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2695            processor::QueryLanguage, translators::sql_pgq,
2696        };
2697
2698        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2699        let start_time = Instant::now();
2700
2701        // Parse and translate (always needed to check for DDL)
2702        let logical_plan = sql_pgq::translate(query)?;
2703
2704        // Handle DDL statements directly (they don't go through the query pipeline)
2705        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2706            return Ok(QueryResult {
2707                columns: vec!["status".into()],
2708                column_types: vec![grafeo_common::types::LogicalType::String],
2709                rows: vec![vec![Value::from(format!(
2710                    "Property graph '{}' created ({} node tables, {} edge tables)",
2711                    cpg.name,
2712                    cpg.node_tables.len(),
2713                    cpg.edge_tables.len()
2714                ))]],
2715                execution_time_ms: None,
2716                rows_scanned: None,
2717                status_message: None,
2718                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2719            });
2720        }
2721
2722        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2723
2724        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2725            cached_plan
2726        } else {
2727            let mut binder = Binder::new();
2728            let _binding_context = binder.bind(&logical_plan)?;
2729            let active = self.active_store();
2730            let optimizer = Optimizer::from_graph_store(&*active);
2731            let plan = optimizer.optimize(logical_plan)?;
2732            self.query_cache.put_optimized(cache_key, plan.clone());
2733            plan
2734        };
2735
2736        let active = self.active_store();
2737        let has_mutations = optimized_plan.root.has_mutations();
2738
2739        let result = self.with_auto_commit(has_mutations, || {
2740            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2741            let planner =
2742                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2743            let mut physical_plan = planner.plan(&optimized_plan)?;
2744            let executor = Executor::with_columns(physical_plan.columns.clone())
2745                .with_deadline(self.query_deadline());
2746            executor.execute(physical_plan.operator.as_mut())
2747        });
2748
2749        #[cfg(feature = "metrics")]
2750        {
2751            #[cfg(not(target_arch = "wasm32"))]
2752            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2753            #[cfg(target_arch = "wasm32")]
2754            let elapsed_ms = None;
2755            self.record_query_metrics("sql", elapsed_ms, &result);
2756        }
2757
2758        result
2759    }
2760
2761    /// Executes a SQL/PGQ query with parameters.
2762    ///
2763    /// # Errors
2764    ///
2765    /// Returns an error if the query fails to parse or execute.
2766    #[cfg(feature = "sql-pgq")]
2767    pub fn execute_sql_with_params(
2768        &self,
2769        query: &str,
2770        params: std::collections::HashMap<String, Value>,
2771    ) -> Result<QueryResult> {
2772        use crate::query::processor::{QueryLanguage, QueryProcessor};
2773
2774        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2775        let start_time = Instant::now();
2776
2777        let has_mutations = Self::query_looks_like_mutation(query);
2778        let active = self.active_store();
2779
2780        let result = self.with_auto_commit(has_mutations, || {
2781            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2782            let processor = QueryProcessor::for_graph_store_with_transaction(
2783                Arc::clone(&active),
2784                Arc::clone(&self.transaction_manager),
2785            )?;
2786            let processor = if let Some(transaction_id) = transaction_id {
2787                processor.with_transaction_context(viewing_epoch, transaction_id)
2788            } else {
2789                processor
2790            };
2791            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2792        });
2793
2794        #[cfg(feature = "metrics")]
2795        {
2796            #[cfg(not(target_arch = "wasm32"))]
2797            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2798            #[cfg(target_arch = "wasm32")]
2799            let elapsed_ms = None;
2800            self.record_query_metrics("sql", elapsed_ms, &result);
2801        }
2802
2803        result
2804    }
2805
2806    /// Executes a query in the specified language by name.
2807    ///
2808    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2809    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2810    ///
2811    /// # Errors
2812    ///
2813    /// Returns an error if the language is unknown/disabled or the query fails.
2814    pub fn execute_language(
2815        &self,
2816        query: &str,
2817        language: &str,
2818        params: Option<std::collections::HashMap<String, Value>>,
2819    ) -> Result<QueryResult> {
2820        let _span = tracing::info_span!(
2821            "grafeo::session::execute",
2822            language,
2823            query_len = query.len(),
2824        )
2825        .entered();
2826        match language {
2827            "gql" => {
2828                if let Some(p) = params {
2829                    self.execute_with_params(query, p)
2830                } else {
2831                    self.execute(query)
2832                }
2833            }
2834            #[cfg(feature = "cypher")]
2835            "cypher" => {
2836                if let Some(p) = params {
2837                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2838
2839                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2840                    let start_time = Instant::now();
2841
2842                    let has_mutations = Self::query_looks_like_mutation(query);
2843                    let active = self.active_store();
2844                    let result = self.with_auto_commit(has_mutations, || {
2845                        let processor = QueryProcessor::for_graph_store_with_transaction(
2846                            Arc::clone(&active),
2847                            Arc::clone(&self.transaction_manager),
2848                        )?;
2849                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2850                        let processor = if let Some(transaction_id) = transaction_id {
2851                            processor.with_transaction_context(viewing_epoch, transaction_id)
2852                        } else {
2853                            processor
2854                        };
2855                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2856                    });
2857
2858                    #[cfg(feature = "metrics")]
2859                    {
2860                        #[cfg(not(target_arch = "wasm32"))]
2861                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2862                        #[cfg(target_arch = "wasm32")]
2863                        let elapsed_ms = None;
2864                        self.record_query_metrics("cypher", elapsed_ms, &result);
2865                    }
2866
2867                    result
2868                } else {
2869                    self.execute_cypher(query)
2870                }
2871            }
2872            #[cfg(feature = "gremlin")]
2873            "gremlin" => {
2874                if let Some(p) = params {
2875                    self.execute_gremlin_with_params(query, p)
2876                } else {
2877                    self.execute_gremlin(query)
2878                }
2879            }
2880            #[cfg(feature = "graphql")]
2881            "graphql" => {
2882                if let Some(p) = params {
2883                    self.execute_graphql_with_params(query, p)
2884                } else {
2885                    self.execute_graphql(query)
2886                }
2887            }
2888            #[cfg(all(feature = "graphql", feature = "rdf"))]
2889            "graphql-rdf" => {
2890                if let Some(p) = params {
2891                    self.execute_graphql_rdf_with_params(query, p)
2892                } else {
2893                    self.execute_graphql_rdf(query)
2894                }
2895            }
2896            #[cfg(feature = "sql-pgq")]
2897            "sql" | "sql-pgq" => {
2898                if let Some(p) = params {
2899                    self.execute_sql_with_params(query, p)
2900                } else {
2901                    self.execute_sql(query)
2902                }
2903            }
2904            #[cfg(all(feature = "sparql", feature = "rdf"))]
2905            "sparql" => {
2906                if let Some(p) = params {
2907                    self.execute_sparql_with_params(query, p)
2908                } else {
2909                    self.execute_sparql(query)
2910                }
2911            }
2912            other => Err(grafeo_common::utils::error::Error::Query(
2913                grafeo_common::utils::error::QueryError::new(
2914                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2915                    format!("Unknown query language: '{other}'"),
2916                ),
2917            )),
2918        }
2919    }
2920
2921    /// Begins a new transaction.
2922    ///
2923    /// # Errors
2924    ///
2925    /// Returns an error if a transaction is already active.
2926    ///
2927    /// # Examples
2928    ///
2929    /// ```no_run
2930    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2931    /// use grafeo_engine::GrafeoDB;
2932    ///
2933    /// let db = GrafeoDB::new_in_memory();
2934    /// let mut session = db.session();
2935    ///
2936    /// session.begin_transaction()?;
2937    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2938    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2939    /// session.commit()?; // Both inserts committed atomically
2940    /// # Ok(())
2941    /// # }
2942    /// ```
2943    /// Clears all cached query plans.
2944    ///
2945    /// The plan cache is shared across all sessions on the same database,
2946    /// so clearing from one session affects all sessions.
2947    pub fn clear_plan_cache(&self) {
2948        self.query_cache.clear();
2949    }
2950
2951    /// Begins a new transaction on this session.
2952    ///
2953    /// Uses the default isolation level (`SnapshotIsolation`).
2954    ///
2955    /// # Errors
2956    ///
2957    /// Returns an error if a transaction is already active.
2958    pub fn begin_transaction(&mut self) -> Result<()> {
2959        self.begin_transaction_inner(false, None)
2960    }
2961
2962    /// Begins a transaction with a specific isolation level.
2963    ///
2964    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2965    ///
2966    /// # Errors
2967    ///
2968    /// Returns an error if a transaction is already active.
2969    pub fn begin_transaction_with_isolation(
2970        &mut self,
2971        isolation_level: crate::transaction::IsolationLevel,
2972    ) -> Result<()> {
2973        self.begin_transaction_inner(false, Some(isolation_level))
2974    }
2975
2976    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2977    fn begin_transaction_inner(
2978        &self,
2979        read_only: bool,
2980        isolation_level: Option<crate::transaction::IsolationLevel>,
2981    ) -> Result<()> {
2982        let _span = tracing::debug_span!("grafeo::tx::begin", read_only).entered();
2983        let mut current = self.current_transaction.lock();
2984        if current.is_some() {
2985            // Nested transaction: create an auto-savepoint instead of a new tx.
2986            drop(current);
2987            let mut depth = self.transaction_nesting_depth.lock();
2988            *depth += 1;
2989            let sp_name = format!("_nested_tx_{}", *depth);
2990            self.savepoint(&sp_name)?;
2991            return Ok(());
2992        }
2993
2994        let active = self.active_lpg_store();
2995        self.transaction_start_node_count
2996            .store(active.node_count(), Ordering::Relaxed);
2997        self.transaction_start_edge_count
2998            .store(active.edge_count(), Ordering::Relaxed);
2999        let transaction_id = if let Some(level) = isolation_level {
3000            self.transaction_manager.begin_with_isolation(level)
3001        } else {
3002            self.transaction_manager.begin()
3003        };
3004        *current = Some(transaction_id);
3005        *self.read_only_tx.lock() = read_only;
3006
3007        // Record the initial graph as "touched" for cross-graph atomicity.
3008        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3009        let key = self.active_graph_storage_key();
3010        let mut touched = self.touched_graphs.lock();
3011        touched.clear();
3012        touched.push(key);
3013
3014        #[cfg(feature = "metrics")]
3015        {
3016            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3017            #[cfg(not(target_arch = "wasm32"))]
3018            {
3019                *self.tx_start_time.lock() = Some(Instant::now());
3020            }
3021        }
3022
3023        Ok(())
3024    }
3025
3026    /// Commits the current transaction.
3027    ///
3028    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3029    ///
3030    /// # Errors
3031    ///
3032    /// Returns an error if no transaction is active.
3033    pub fn commit(&mut self) -> Result<()> {
3034        self.commit_inner()
3035    }
3036
3037    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3038    fn commit_inner(&self) -> Result<()> {
3039        let _span = tracing::debug_span!("grafeo::tx::commit").entered();
3040        // Nested transaction: release the auto-savepoint (changes are preserved).
3041        {
3042            let mut depth = self.transaction_nesting_depth.lock();
3043            if *depth > 0 {
3044                let sp_name = format!("_nested_tx_{depth}");
3045                *depth -= 1;
3046                drop(depth);
3047                return self.release_savepoint(&sp_name);
3048            }
3049        }
3050
3051        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3052            grafeo_common::utils::error::Error::Transaction(
3053                grafeo_common::utils::error::TransactionError::InvalidState(
3054                    "No active transaction".to_string(),
3055                ),
3056            )
3057        })?;
3058
3059        // Validate the transaction first (conflict detection) before committing data.
3060        // If this fails, we rollback the data changes instead of making them permanent.
3061        let touched = self.touched_graphs.lock().clone();
3062        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3063            Ok(epoch) => epoch,
3064            Err(e) => {
3065                // Conflict detected: rollback the data changes
3066                for graph_name in &touched {
3067                    let store = self.resolve_store(graph_name);
3068                    store.rollback_transaction_properties(transaction_id);
3069                }
3070                #[cfg(feature = "rdf")]
3071                self.rollback_rdf_transaction(transaction_id);
3072                *self.read_only_tx.lock() = false;
3073                self.savepoints.lock().clear();
3074                self.touched_graphs.lock().clear();
3075                #[cfg(feature = "metrics")]
3076                {
3077                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3078                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3079                    #[cfg(not(target_arch = "wasm32"))]
3080                    if let Some(start) = self.tx_start_time.lock().take() {
3081                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3082                        crate::metrics::record_metric!(
3083                            self.metrics,
3084                            tx_duration,
3085                            observe duration_ms
3086                        );
3087                    }
3088                }
3089                return Err(e);
3090            }
3091        };
3092
3093        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3094        for graph_name in &touched {
3095            let store = self.resolve_store(graph_name);
3096            store.finalize_version_epochs(transaction_id, commit_epoch);
3097        }
3098
3099        // Commit succeeded: discard undo logs (make changes permanent)
3100        #[cfg(feature = "rdf")]
3101        self.commit_rdf_transaction(transaction_id);
3102
3103        for graph_name in &touched {
3104            let store = self.resolve_store(graph_name);
3105            store.commit_transaction_properties(transaction_id);
3106        }
3107
3108        // Sync epoch for all touched graphs so that convenience lookups
3109        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3110        let current_epoch = self.transaction_manager.current_epoch();
3111        for graph_name in &touched {
3112            let store = self.resolve_store(graph_name);
3113            store.sync_epoch(current_epoch);
3114        }
3115
3116        // Reset read-only flag, clear savepoints and touched graphs
3117        *self.read_only_tx.lock() = false;
3118        self.savepoints.lock().clear();
3119        self.touched_graphs.lock().clear();
3120
3121        // Auto-GC: periodically prune old MVCC versions
3122        if self.gc_interval > 0 {
3123            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3124            if count.is_multiple_of(self.gc_interval) {
3125                let min_epoch = self.transaction_manager.min_active_epoch();
3126                for graph_name in &touched {
3127                    let store = self.resolve_store(graph_name);
3128                    store.gc_versions(min_epoch);
3129                }
3130                self.transaction_manager.gc();
3131                #[cfg(feature = "metrics")]
3132                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3133            }
3134        }
3135
3136        #[cfg(feature = "metrics")]
3137        {
3138            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3139            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3140            #[cfg(not(target_arch = "wasm32"))]
3141            if let Some(start) = self.tx_start_time.lock().take() {
3142                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3143                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3144            }
3145        }
3146
3147        Ok(())
3148    }
3149
3150    /// Aborts the current transaction.
3151    ///
3152    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3153    ///
3154    /// # Errors
3155    ///
3156    /// Returns an error if no transaction is active.
3157    ///
3158    /// # Examples
3159    ///
3160    /// ```no_run
3161    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3162    /// use grafeo_engine::GrafeoDB;
3163    ///
3164    /// let db = GrafeoDB::new_in_memory();
3165    /// let mut session = db.session();
3166    ///
3167    /// session.begin_transaction()?;
3168    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3169    /// session.rollback()?; // Insert is discarded
3170    /// # Ok(())
3171    /// # }
3172    /// ```
3173    pub fn rollback(&mut self) -> Result<()> {
3174        self.rollback_inner()
3175    }
3176
3177    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3178    fn rollback_inner(&self) -> Result<()> {
3179        let _span = tracing::debug_span!("grafeo::tx::rollback").entered();
3180        // Nested transaction: rollback to the auto-savepoint.
3181        {
3182            let mut depth = self.transaction_nesting_depth.lock();
3183            if *depth > 0 {
3184                let sp_name = format!("_nested_tx_{depth}");
3185                *depth -= 1;
3186                drop(depth);
3187                return self.rollback_to_savepoint(&sp_name);
3188            }
3189        }
3190
3191        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3192            grafeo_common::utils::error::Error::Transaction(
3193                grafeo_common::utils::error::TransactionError::InvalidState(
3194                    "No active transaction".to_string(),
3195                ),
3196            )
3197        })?;
3198
3199        // Reset read-only flag
3200        *self.read_only_tx.lock() = false;
3201
3202        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3203        let touched = self.touched_graphs.lock().clone();
3204        for graph_name in &touched {
3205            let store = self.resolve_store(graph_name);
3206            store.discard_uncommitted_versions(transaction_id);
3207        }
3208
3209        // Discard pending operations in the RDF store
3210        #[cfg(feature = "rdf")]
3211        self.rollback_rdf_transaction(transaction_id);
3212
3213        // Clear savepoints and touched graphs
3214        self.savepoints.lock().clear();
3215        self.touched_graphs.lock().clear();
3216
3217        // Mark transaction as aborted in the manager
3218        let result = self.transaction_manager.abort(transaction_id);
3219
3220        #[cfg(feature = "metrics")]
3221        if result.is_ok() {
3222            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3223            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3224            #[cfg(not(target_arch = "wasm32"))]
3225            if let Some(start) = self.tx_start_time.lock().take() {
3226                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3227                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3228            }
3229        }
3230
3231        result
3232    }
3233
3234    /// Creates a named savepoint within the current transaction.
3235    ///
3236    /// The savepoint captures the current node/edge ID counters so that
3237    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3238    /// entities created after this point.
3239    ///
3240    /// # Errors
3241    ///
3242    /// Returns an error if no transaction is active.
3243    pub fn savepoint(&self, name: &str) -> Result<()> {
3244        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3245            grafeo_common::utils::error::Error::Transaction(
3246                grafeo_common::utils::error::TransactionError::InvalidState(
3247                    "No active transaction".to_string(),
3248                ),
3249            )
3250        })?;
3251
3252        // Capture state for every graph touched so far.
3253        let touched = self.touched_graphs.lock().clone();
3254        let graph_snapshots: Vec<GraphSavepoint> = touched
3255            .iter()
3256            .map(|graph_name| {
3257                let store = self.resolve_store(graph_name);
3258                GraphSavepoint {
3259                    graph_name: graph_name.clone(),
3260                    next_node_id: store.peek_next_node_id(),
3261                    next_edge_id: store.peek_next_edge_id(),
3262                    undo_log_position: store.property_undo_log_position(tx_id),
3263                }
3264            })
3265            .collect();
3266
3267        self.savepoints.lock().push(SavepointState {
3268            name: name.to_string(),
3269            graph_snapshots,
3270            active_graph: self.current_graph.lock().clone(),
3271        });
3272        Ok(())
3273    }
3274
3275    /// Rolls back to a named savepoint, undoing all writes made after it.
3276    ///
3277    /// The savepoint and any savepoints created after it are removed.
3278    /// Entities with IDs >= the savepoint snapshot are discarded.
3279    ///
3280    /// # Errors
3281    ///
3282    /// Returns an error if no transaction is active or the savepoint does not exist.
3283    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3284        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3285            grafeo_common::utils::error::Error::Transaction(
3286                grafeo_common::utils::error::TransactionError::InvalidState(
3287                    "No active transaction".to_string(),
3288                ),
3289            )
3290        })?;
3291
3292        let mut savepoints = self.savepoints.lock();
3293
3294        // Find the savepoint by name (search from the end for nested savepoints)
3295        let pos = savepoints
3296            .iter()
3297            .rposition(|sp| sp.name == name)
3298            .ok_or_else(|| {
3299                grafeo_common::utils::error::Error::Transaction(
3300                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3301                        "Savepoint '{name}' not found"
3302                    )),
3303                )
3304            })?;
3305
3306        let sp_state = savepoints[pos].clone();
3307
3308        // Remove this savepoint and all later ones
3309        savepoints.truncate(pos);
3310        drop(savepoints);
3311
3312        // Roll back each graph that was captured in the savepoint.
3313        for gs in &sp_state.graph_snapshots {
3314            let store = self.resolve_store(&gs.graph_name);
3315
3316            // Replay property/label undo entries recorded after the savepoint
3317            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3318
3319            // Discard entities created after the savepoint
3320            let current_next_node = store.peek_next_node_id();
3321            let current_next_edge = store.peek_next_edge_id();
3322
3323            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3324                .map(NodeId::new)
3325                .collect();
3326            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3327                .map(EdgeId::new)
3328                .collect();
3329
3330            if !node_ids.is_empty() || !edge_ids.is_empty() {
3331                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3332            }
3333        }
3334
3335        // Also roll back any graphs that were touched AFTER the savepoint
3336        // but not captured in it. These need full discard since the savepoint
3337        // didn't include them.
3338        let touched = self.touched_graphs.lock().clone();
3339        for graph_name in &touched {
3340            let already_captured = sp_state
3341                .graph_snapshots
3342                .iter()
3343                .any(|gs| gs.graph_name == *graph_name);
3344            if !already_captured {
3345                let store = self.resolve_store(graph_name);
3346                store.discard_uncommitted_versions(transaction_id);
3347            }
3348        }
3349
3350        // Restore touched_graphs to only the graphs that were known at savepoint time.
3351        let mut touched = self.touched_graphs.lock();
3352        touched.clear();
3353        for gs in &sp_state.graph_snapshots {
3354            if !touched.contains(&gs.graph_name) {
3355                touched.push(gs.graph_name.clone());
3356            }
3357        }
3358
3359        Ok(())
3360    }
3361
3362    /// Releases (removes) a named savepoint without rolling back.
3363    ///
3364    /// # Errors
3365    ///
3366    /// Returns an error if no transaction is active or the savepoint does not exist.
3367    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3368        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3369            grafeo_common::utils::error::Error::Transaction(
3370                grafeo_common::utils::error::TransactionError::InvalidState(
3371                    "No active transaction".to_string(),
3372                ),
3373            )
3374        })?;
3375
3376        let mut savepoints = self.savepoints.lock();
3377        let pos = savepoints
3378            .iter()
3379            .rposition(|sp| sp.name == name)
3380            .ok_or_else(|| {
3381                grafeo_common::utils::error::Error::Transaction(
3382                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3383                        "Savepoint '{name}' not found"
3384                    )),
3385                )
3386            })?;
3387        savepoints.remove(pos);
3388        Ok(())
3389    }
3390
3391    /// Returns whether a transaction is active.
3392    #[must_use]
3393    pub fn in_transaction(&self) -> bool {
3394        self.current_transaction.lock().is_some()
3395    }
3396
3397    /// Returns the current transaction ID, if any.
3398    #[must_use]
3399    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3400        *self.current_transaction.lock()
3401    }
3402
3403    /// Returns a reference to the transaction manager.
3404    #[must_use]
3405    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3406        &self.transaction_manager
3407    }
3408
3409    /// Returns the store's current node count and the count at transaction start.
3410    #[must_use]
3411    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3412        (
3413            self.transaction_start_node_count.load(Ordering::Relaxed),
3414            self.active_lpg_store().node_count(),
3415        )
3416    }
3417
3418    /// Returns the store's current edge count and the count at transaction start.
3419    #[must_use]
3420    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3421        (
3422            self.transaction_start_edge_count.load(Ordering::Relaxed),
3423            self.active_lpg_store().edge_count(),
3424        )
3425    }
3426
3427    /// Prepares the current transaction for a two-phase commit.
3428    ///
3429    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3430    /// lets you inspect pending changes and attach metadata before finalizing.
3431    /// The mutable borrow prevents concurrent operations while the commit is
3432    /// pending.
3433    ///
3434    /// If the `PreparedCommit` is dropped without calling `commit()` or
3435    /// `abort()`, the transaction is automatically rolled back.
3436    ///
3437    /// # Errors
3438    ///
3439    /// Returns an error if no transaction is active.
3440    ///
3441    /// # Examples
3442    ///
3443    /// ```no_run
3444    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3445    /// use grafeo_engine::GrafeoDB;
3446    ///
3447    /// let db = GrafeoDB::new_in_memory();
3448    /// let mut session = db.session();
3449    ///
3450    /// session.begin_transaction()?;
3451    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3452    ///
3453    /// let mut prepared = session.prepare_commit()?;
3454    /// println!("Nodes written: {}", prepared.info().nodes_written);
3455    /// prepared.set_metadata("audit_user", "admin");
3456    /// prepared.commit()?;
3457    /// # Ok(())
3458    /// # }
3459    /// ```
3460    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3461        crate::transaction::PreparedCommit::new(self)
3462    }
3463
3464    /// Sets auto-commit mode.
3465    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3466        self.auto_commit = auto_commit;
3467    }
3468
3469    /// Returns whether auto-commit is enabled.
3470    #[must_use]
3471    pub fn auto_commit(&self) -> bool {
3472        self.auto_commit
3473    }
3474
3475    /// Returns `true` if auto-commit should wrap this execution.
3476    ///
3477    /// Auto-commit kicks in when: the session is in auto-commit mode,
3478    /// no explicit transaction is active, and the query mutates data.
3479    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3480        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3481    }
3482
3483    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3484    /// returns `true`. On error the transaction is rolled back.
3485    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3486    where
3487        F: FnOnce() -> Result<QueryResult>,
3488    {
3489        if self.needs_auto_commit(has_mutations) {
3490            self.begin_transaction_inner(false, None)?;
3491            match body() {
3492                Ok(result) => {
3493                    self.commit_inner()?;
3494                    Ok(result)
3495                }
3496                Err(e) => {
3497                    let _ = self.rollback_inner();
3498                    Err(e)
3499                }
3500            }
3501        } else {
3502            body()
3503        }
3504    }
3505
3506    /// Quick heuristic: returns `true` when the query text looks like it
3507    /// performs a mutation. Used by `_with_params` paths that go through the
3508    /// `QueryProcessor` (where the logical plan isn't available before
3509    /// execution). False negatives are harmless: the data just won't be
3510    /// auto-committed, which matches the prior behaviour.
3511    fn query_looks_like_mutation(query: &str) -> bool {
3512        let upper = query.to_ascii_uppercase();
3513        upper.contains("INSERT")
3514            || upper.contains("CREATE")
3515            || upper.contains("DELETE")
3516            || upper.contains("MERGE")
3517            || upper.contains("SET")
3518            || upper.contains("REMOVE")
3519            || upper.contains("DROP")
3520            || upper.contains("ALTER")
3521    }
3522
3523    /// Computes the wall-clock deadline for query execution.
3524    #[must_use]
3525    fn query_deadline(&self) -> Option<Instant> {
3526        #[cfg(not(target_arch = "wasm32"))]
3527        {
3528            self.query_timeout.map(|d| Instant::now() + d)
3529        }
3530        #[cfg(target_arch = "wasm32")]
3531        {
3532            let _ = &self.query_timeout;
3533            None
3534        }
3535    }
3536
3537    /// Records query metrics for any language.
3538    ///
3539    /// Called after query execution to update counters, latency histogram,
3540    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3541    /// where `Instant` is unavailable.
3542    #[cfg(feature = "metrics")]
3543    fn record_query_metrics(
3544        &self,
3545        language: &str,
3546        elapsed_ms: Option<f64>,
3547        result: &Result<crate::database::QueryResult>,
3548    ) {
3549        use crate::metrics::record_metric;
3550
3551        record_metric!(self.metrics, query_count, inc);
3552        if let Some(ref reg) = self.metrics {
3553            reg.query_count_by_language.increment(language);
3554        }
3555        if let Some(ms) = elapsed_ms {
3556            record_metric!(self.metrics, query_latency, observe ms);
3557        }
3558        match result {
3559            Ok(r) => {
3560                let returned = r.rows.len() as u64;
3561                record_metric!(self.metrics, rows_returned, add returned);
3562                if let Some(scanned) = r.rows_scanned {
3563                    record_metric!(self.metrics, rows_scanned, add scanned);
3564                }
3565            }
3566            Err(e) => {
3567                record_metric!(self.metrics, query_errors, inc);
3568                // Detect timeout errors
3569                let msg = e.to_string();
3570                if msg.contains("exceeded timeout") {
3571                    record_metric!(self.metrics, query_timeouts, inc);
3572                }
3573            }
3574        }
3575    }
3576
3577    /// Evaluates a simple integer literal from a session parameter expression.
3578    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3579        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3580        match expr {
3581            Expression::Literal(Literal::Integer(n)) => Some(*n),
3582            _ => None,
3583        }
3584    }
3585
3586    /// Returns the current transaction context for MVCC visibility.
3587    ///
3588    /// Returns `(viewing_epoch, transaction_id)` where:
3589    /// - `viewing_epoch` is the epoch at which to check version visibility
3590    /// - `transaction_id` is the current transaction ID (if in a transaction)
3591    #[must_use]
3592    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3593        // Time-travel override takes precedence (read-only, no tx context)
3594        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3595            return (epoch, None);
3596        }
3597
3598        if let Some(transaction_id) = *self.current_transaction.lock() {
3599            // In a transaction: use the transaction's start epoch
3600            let epoch = self
3601                .transaction_manager
3602                .start_epoch(transaction_id)
3603                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3604            (epoch, Some(transaction_id))
3605        } else {
3606            // No transaction: use current epoch
3607            (self.transaction_manager.current_epoch(), None)
3608        }
3609    }
3610
3611    /// Creates a planner with transaction context and constraint validator.
3612    ///
3613    /// The `store` parameter is the graph store to plan against (use
3614    /// `self.active_store()` for graph-aware execution).
3615    fn create_planner_for_store(
3616        &self,
3617        store: Arc<dyn GraphStoreMut>,
3618        viewing_epoch: EpochId,
3619        transaction_id: Option<TransactionId>,
3620    ) -> crate::query::Planner {
3621        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3622    }
3623
3624    fn create_planner_for_store_with_read_only(
3625        &self,
3626        store: Arc<dyn GraphStoreMut>,
3627        viewing_epoch: EpochId,
3628        transaction_id: Option<TransactionId>,
3629        read_only: bool,
3630    ) -> crate::query::Planner {
3631        use crate::query::Planner;
3632        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3633
3634        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3635        let info_store = Arc::clone(&store);
3636        let schema_store = Arc::clone(&store);
3637
3638        let session_context = SessionContext {
3639            current_schema: self.current_schema(),
3640            current_graph: self.current_graph(),
3641            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3642            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3643        };
3644
3645        let mut planner = Planner::with_context(
3646            Arc::clone(&store),
3647            Arc::clone(&self.transaction_manager),
3648            transaction_id,
3649            viewing_epoch,
3650        )
3651        .with_factorized_execution(self.factorized_execution)
3652        .with_catalog(Arc::clone(&self.catalog))
3653        .with_session_context(session_context)
3654        .with_read_only(read_only);
3655
3656        // Attach the constraint validator for schema enforcement
3657        let validator =
3658            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3659        planner = planner.with_validator(Arc::new(validator));
3660
3661        planner
3662    }
3663
3664    /// Builds a `Value::Map` for the `info()` introspection function.
3665    fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3666        use grafeo_common::types::PropertyKey;
3667        use std::collections::BTreeMap;
3668
3669        let mut map = BTreeMap::new();
3670        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3671        map.insert(
3672            PropertyKey::from("node_count"),
3673            Value::Int64(store.node_count() as i64),
3674        );
3675        map.insert(
3676            PropertyKey::from("edge_count"),
3677            Value::Int64(store.edge_count() as i64),
3678        );
3679        map.insert(
3680            PropertyKey::from("version"),
3681            Value::String(env!("CARGO_PKG_VERSION").into()),
3682        );
3683        Value::Map(map.into())
3684    }
3685
3686    /// Builds a `Value::Map` for the `schema()` introspection function.
3687    fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3688        use grafeo_common::types::PropertyKey;
3689        use std::collections::BTreeMap;
3690
3691        let labels: Vec<Value> = store
3692            .all_labels()
3693            .into_iter()
3694            .map(|l| Value::String(l.into()))
3695            .collect();
3696        let edge_types: Vec<Value> = store
3697            .all_edge_types()
3698            .into_iter()
3699            .map(|t| Value::String(t.into()))
3700            .collect();
3701        let property_keys: Vec<Value> = store
3702            .all_property_keys()
3703            .into_iter()
3704            .map(|k| Value::String(k.into()))
3705            .collect();
3706
3707        let mut map = BTreeMap::new();
3708        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3709        map.insert(
3710            PropertyKey::from("edge_types"),
3711            Value::List(edge_types.into()),
3712        );
3713        map.insert(
3714            PropertyKey::from("property_keys"),
3715            Value::List(property_keys.into()),
3716        );
3717        Value::Map(map.into())
3718    }
3719
3720    /// Creates a node directly (bypassing query execution).
3721    ///
3722    /// This is a low-level API for testing and direct manipulation.
3723    /// If a transaction is active, the node will be versioned with the transaction ID.
3724    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3725        let (epoch, transaction_id) = self.get_transaction_context();
3726        self.active_lpg_store().create_node_versioned(
3727            labels,
3728            epoch,
3729            transaction_id.unwrap_or(TransactionId::SYSTEM),
3730        )
3731    }
3732
3733    /// Creates a node with properties.
3734    ///
3735    /// If a transaction is active, the node will be versioned with the transaction ID.
3736    pub fn create_node_with_props<'a>(
3737        &self,
3738        labels: &[&str],
3739        properties: impl IntoIterator<Item = (&'a str, Value)>,
3740    ) -> NodeId {
3741        let (epoch, transaction_id) = self.get_transaction_context();
3742        self.active_lpg_store().create_node_with_props_versioned(
3743            labels,
3744            properties,
3745            epoch,
3746            transaction_id.unwrap_or(TransactionId::SYSTEM),
3747        )
3748    }
3749
3750    /// Creates an edge between two nodes.
3751    ///
3752    /// This is a low-level API for testing and direct manipulation.
3753    /// If a transaction is active, the edge will be versioned with the transaction ID.
3754    pub fn create_edge(
3755        &self,
3756        src: NodeId,
3757        dst: NodeId,
3758        edge_type: &str,
3759    ) -> grafeo_common::types::EdgeId {
3760        let (epoch, transaction_id) = self.get_transaction_context();
3761        self.active_lpg_store().create_edge_versioned(
3762            src,
3763            dst,
3764            edge_type,
3765            epoch,
3766            transaction_id.unwrap_or(TransactionId::SYSTEM),
3767        )
3768    }
3769
3770    // =========================================================================
3771    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3772    // =========================================================================
3773
3774    /// Gets a node by ID directly, bypassing query planning.
3775    ///
3776    /// This is the fastest way to retrieve a single node when you know its ID.
3777    /// Skips parsing, binding, optimization, and physical planning entirely.
3778    ///
3779    /// # Performance
3780    ///
3781    /// - Time complexity: O(1) average case
3782    /// - No lock contention (uses DashMap internally)
3783    /// - ~20-30x faster than equivalent MATCH query
3784    ///
3785    /// # Example
3786    ///
3787    /// ```no_run
3788    /// # use grafeo_engine::GrafeoDB;
3789    /// # let db = GrafeoDB::new_in_memory();
3790    /// let session = db.session();
3791    /// let node_id = session.create_node(&["Person"]);
3792    ///
3793    /// // Direct lookup - O(1), no query planning
3794    /// let node = session.get_node(node_id);
3795    /// assert!(node.is_some());
3796    /// ```
3797    #[must_use]
3798    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3799        let (epoch, transaction_id) = self.get_transaction_context();
3800        self.active_lpg_store().get_node_versioned(
3801            id,
3802            epoch,
3803            transaction_id.unwrap_or(TransactionId::SYSTEM),
3804        )
3805    }
3806
3807    /// Gets a single property from a node by ID, bypassing query planning.
3808    ///
3809    /// More efficient than `get_node()` when you only need one property,
3810    /// as it avoids loading the full node with all properties.
3811    ///
3812    /// # Performance
3813    ///
3814    /// - Time complexity: O(1) average case
3815    /// - No query planning overhead
3816    ///
3817    /// # Example
3818    ///
3819    /// ```no_run
3820    /// # use grafeo_engine::GrafeoDB;
3821    /// # use grafeo_common::types::Value;
3822    /// # let db = GrafeoDB::new_in_memory();
3823    /// let session = db.session();
3824    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3825    ///
3826    /// // Direct property access - O(1)
3827    /// let name = session.get_node_property(id, "name");
3828    /// assert_eq!(name, Some(Value::String("Alix".into())));
3829    /// ```
3830    #[must_use]
3831    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3832        self.get_node(id)
3833            .and_then(|node| node.get_property(key).cloned())
3834    }
3835
3836    /// Gets an edge by ID directly, bypassing query planning.
3837    ///
3838    /// # Performance
3839    ///
3840    /// - Time complexity: O(1) average case
3841    /// - No lock contention
3842    #[must_use]
3843    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3844        let (epoch, transaction_id) = self.get_transaction_context();
3845        self.active_lpg_store().get_edge_versioned(
3846            id,
3847            epoch,
3848            transaction_id.unwrap_or(TransactionId::SYSTEM),
3849        )
3850    }
3851
3852    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3853    ///
3854    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3855    ///
3856    /// # Performance
3857    ///
3858    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3859    /// - Uses adjacency index for direct access
3860    /// - ~10-20x faster than equivalent MATCH query
3861    ///
3862    /// # Example
3863    ///
3864    /// ```no_run
3865    /// # use grafeo_engine::GrafeoDB;
3866    /// # let db = GrafeoDB::new_in_memory();
3867    /// let session = db.session();
3868    /// let alix = session.create_node(&["Person"]);
3869    /// let gus = session.create_node(&["Person"]);
3870    /// session.create_edge(alix, gus, "KNOWS");
3871    ///
3872    /// // Direct neighbor lookup - O(degree)
3873    /// let neighbors = session.get_neighbors_outgoing(alix);
3874    /// assert_eq!(neighbors.len(), 1);
3875    /// assert_eq!(neighbors[0].0, gus);
3876    /// ```
3877    #[must_use]
3878    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3879        self.active_lpg_store()
3880            .edges_from(node, Direction::Outgoing)
3881            .collect()
3882    }
3883
3884    /// Gets incoming neighbors of a node directly, bypassing query planning.
3885    ///
3886    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3887    ///
3888    /// # Performance
3889    ///
3890    /// - Time complexity: O(degree) where degree is the number of incoming edges
3891    /// - Uses backward adjacency index for direct access
3892    #[must_use]
3893    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3894        self.active_lpg_store()
3895            .edges_from(node, Direction::Incoming)
3896            .collect()
3897    }
3898
3899    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3900    ///
3901    /// # Example
3902    ///
3903    /// ```no_run
3904    /// # use grafeo_engine::GrafeoDB;
3905    /// # let db = GrafeoDB::new_in_memory();
3906    /// # let session = db.session();
3907    /// # let alix = session.create_node(&["Person"]);
3908    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3909    /// ```
3910    #[must_use]
3911    pub fn get_neighbors_outgoing_by_type(
3912        &self,
3913        node: NodeId,
3914        edge_type: &str,
3915    ) -> Vec<(NodeId, EdgeId)> {
3916        self.active_lpg_store()
3917            .edges_from(node, Direction::Outgoing)
3918            .filter(|(_, edge_id)| {
3919                self.get_edge(*edge_id)
3920                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3921            })
3922            .collect()
3923    }
3924
3925    /// Checks if a node exists, bypassing query planning.
3926    ///
3927    /// # Performance
3928    ///
3929    /// - Time complexity: O(1)
3930    /// - Fastest existence check available
3931    #[must_use]
3932    pub fn node_exists(&self, id: NodeId) -> bool {
3933        self.get_node(id).is_some()
3934    }
3935
3936    /// Checks if an edge exists, bypassing query planning.
3937    #[must_use]
3938    pub fn edge_exists(&self, id: EdgeId) -> bool {
3939        self.get_edge(id).is_some()
3940    }
3941
3942    /// Gets the degree (number of edges) of a node.
3943    ///
3944    /// Returns (outgoing_degree, incoming_degree).
3945    #[must_use]
3946    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3947        let active = self.active_lpg_store();
3948        let out = active.out_degree(node);
3949        let in_degree = active.in_degree(node);
3950        (out, in_degree)
3951    }
3952
3953    /// Batch lookup of multiple nodes by ID.
3954    ///
3955    /// More efficient than calling `get_node()` in a loop because it
3956    /// amortizes overhead.
3957    ///
3958    /// # Performance
3959    ///
3960    /// - Time complexity: O(n) where n is the number of IDs
3961    /// - Better cache utilization than individual lookups
3962    #[must_use]
3963    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3964        let (epoch, transaction_id) = self.get_transaction_context();
3965        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3966        let active = self.active_lpg_store();
3967        ids.iter()
3968            .map(|&id| active.get_node_versioned(id, epoch, tx))
3969            .collect()
3970    }
3971
3972    // ── Change Data Capture ─────────────────────────────────────────────
3973
3974    /// Returns the full change history for an entity (node or edge).
3975    #[cfg(feature = "cdc")]
3976    pub fn history(
3977        &self,
3978        entity_id: impl Into<crate::cdc::EntityId>,
3979    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3980        Ok(self.cdc_log.history(entity_id.into()))
3981    }
3982
3983    /// Returns change events for an entity since the given epoch.
3984    #[cfg(feature = "cdc")]
3985    pub fn history_since(
3986        &self,
3987        entity_id: impl Into<crate::cdc::EntityId>,
3988        since_epoch: EpochId,
3989    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3990        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3991    }
3992
3993    /// Returns all change events across all entities in an epoch range.
3994    #[cfg(feature = "cdc")]
3995    pub fn changes_between(
3996        &self,
3997        start_epoch: EpochId,
3998        end_epoch: EpochId,
3999    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4000        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4001    }
4002}
4003
4004impl Drop for Session {
4005    fn drop(&mut self) {
4006        // Auto-rollback any active transaction to prevent leaked MVCC state,
4007        // dangling write locks, and uncommitted versions lingering in the store.
4008        if self.in_transaction() {
4009            let _ = self.rollback_inner();
4010        }
4011
4012        #[cfg(feature = "metrics")]
4013        if let Some(ref reg) = self.metrics {
4014            reg.session_active
4015                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4016        }
4017    }
4018}
4019
4020#[cfg(test)]
4021mod tests {
4022    use super::parse_default_literal;
4023    use crate::database::GrafeoDB;
4024    use grafeo_common::types::Value;
4025
4026    // -----------------------------------------------------------------------
4027    // parse_default_literal
4028    // -----------------------------------------------------------------------
4029
4030    #[test]
4031    fn parse_default_literal_null() {
4032        assert_eq!(parse_default_literal("null"), Value::Null);
4033        assert_eq!(parse_default_literal("NULL"), Value::Null);
4034        assert_eq!(parse_default_literal("Null"), Value::Null);
4035    }
4036
4037    #[test]
4038    fn parse_default_literal_bool() {
4039        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4040        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4041        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4042        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4043    }
4044
4045    #[test]
4046    fn parse_default_literal_string_single_quoted() {
4047        assert_eq!(
4048            parse_default_literal("'hello'"),
4049            Value::String("hello".into())
4050        );
4051    }
4052
4053    #[test]
4054    fn parse_default_literal_string_double_quoted() {
4055        assert_eq!(
4056            parse_default_literal("\"world\""),
4057            Value::String("world".into())
4058        );
4059    }
4060
4061    #[test]
4062    fn parse_default_literal_integer() {
4063        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4064        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4065        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4066    }
4067
4068    #[test]
4069    fn parse_default_literal_float() {
4070        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4071        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4072    }
4073
4074    #[test]
4075    fn parse_default_literal_fallback_string() {
4076        // Not a recognized literal, not quoted, not a number
4077        assert_eq!(
4078            parse_default_literal("some_identifier"),
4079            Value::String("some_identifier".into())
4080        );
4081    }
4082
4083    #[test]
4084    fn test_session_create_node() {
4085        let db = GrafeoDB::new_in_memory();
4086        let session = db.session();
4087
4088        let id = session.create_node(&["Person"]);
4089        assert!(id.is_valid());
4090        assert_eq!(db.node_count(), 1);
4091    }
4092
4093    #[test]
4094    fn test_session_transaction() {
4095        let db = GrafeoDB::new_in_memory();
4096        let mut session = db.session();
4097
4098        assert!(!session.in_transaction());
4099
4100        session.begin_transaction().unwrap();
4101        assert!(session.in_transaction());
4102
4103        session.commit().unwrap();
4104        assert!(!session.in_transaction());
4105    }
4106
4107    #[test]
4108    fn test_session_transaction_context() {
4109        let db = GrafeoDB::new_in_memory();
4110        let mut session = db.session();
4111
4112        // Without transaction - context should have current epoch and no transaction_id
4113        let (_epoch1, transaction_id1) = session.get_transaction_context();
4114        assert!(transaction_id1.is_none());
4115
4116        // Start a transaction
4117        session.begin_transaction().unwrap();
4118        let (epoch2, transaction_id2) = session.get_transaction_context();
4119        assert!(transaction_id2.is_some());
4120        // Transaction should have a valid epoch
4121        let _ = epoch2; // Use the variable
4122
4123        // Commit and verify
4124        session.commit().unwrap();
4125        let (epoch3, tx_id3) = session.get_transaction_context();
4126        assert!(tx_id3.is_none());
4127        // Epoch should have advanced after commit
4128        assert!(epoch3.as_u64() >= epoch2.as_u64());
4129    }
4130
4131    #[test]
4132    fn test_session_rollback() {
4133        let db = GrafeoDB::new_in_memory();
4134        let mut session = db.session();
4135
4136        session.begin_transaction().unwrap();
4137        session.rollback().unwrap();
4138        assert!(!session.in_transaction());
4139    }
4140
4141    #[test]
4142    fn test_session_rollback_discards_versions() {
4143        use grafeo_common::types::TransactionId;
4144
4145        let db = GrafeoDB::new_in_memory();
4146
4147        // Create a node outside of any transaction (at system level)
4148        let node_before = db.store().create_node(&["Person"]);
4149        assert!(node_before.is_valid());
4150        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4151
4152        // Start a transaction
4153        let mut session = db.session();
4154        session.begin_transaction().unwrap();
4155        let transaction_id = session.current_transaction.lock().unwrap();
4156
4157        // Create a node versioned with the transaction's ID
4158        let epoch = db.store().current_epoch();
4159        let node_in_tx = db
4160            .store()
4161            .create_node_versioned(&["Person"], epoch, transaction_id);
4162        assert!(node_in_tx.is_valid());
4163
4164        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4165        // non-versioned lookups like node_count(). Verify the node is visible
4166        // only through the owning transaction.
4167        assert_eq!(
4168            db.node_count(),
4169            1,
4170            "PENDING nodes should be invisible to non-versioned node_count()"
4171        );
4172        assert!(
4173            db.store()
4174                .get_node_versioned(node_in_tx, epoch, transaction_id)
4175                .is_some(),
4176            "Transaction node should be visible to its own transaction"
4177        );
4178
4179        // Rollback the transaction
4180        session.rollback().unwrap();
4181        assert!(!session.in_transaction());
4182
4183        // The node created in the transaction should be discarded
4184        // Only the first node should remain visible
4185        let count_after = db.node_count();
4186        assert_eq!(
4187            count_after, 1,
4188            "Rollback should discard uncommitted node, but got {count_after}"
4189        );
4190
4191        // The original node should still be accessible
4192        let current_epoch = db.store().current_epoch();
4193        assert!(
4194            db.store()
4195                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4196                .is_some(),
4197            "Original node should still exist"
4198        );
4199
4200        // The node created in the transaction should not be accessible
4201        assert!(
4202            db.store()
4203                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4204                .is_none(),
4205            "Transaction node should be gone"
4206        );
4207    }
4208
4209    #[test]
4210    fn test_session_create_node_in_transaction() {
4211        // Test that session.create_node() is transaction-aware
4212        let db = GrafeoDB::new_in_memory();
4213
4214        // Create a node outside of any transaction
4215        let node_before = db.create_node(&["Person"]);
4216        assert!(node_before.is_valid());
4217        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4218
4219        // Start a transaction and create a node through the session
4220        let mut session = db.session();
4221        session.begin_transaction().unwrap();
4222        let transaction_id = session.current_transaction.lock().unwrap();
4223
4224        // Create a node through session.create_node() - should be versioned with tx
4225        let node_in_tx = session.create_node(&["Person"]);
4226        assert!(node_in_tx.is_valid());
4227
4228        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4229        // non-versioned lookups. Verify the node is visible only to its own tx.
4230        assert_eq!(
4231            db.node_count(),
4232            1,
4233            "PENDING nodes should be invisible to non-versioned node_count()"
4234        );
4235        let epoch = db.store().current_epoch();
4236        assert!(
4237            db.store()
4238                .get_node_versioned(node_in_tx, epoch, transaction_id)
4239                .is_some(),
4240            "Transaction node should be visible to its own transaction"
4241        );
4242
4243        // Rollback the transaction
4244        session.rollback().unwrap();
4245
4246        // The node created via session.create_node() should be discarded
4247        let count_after = db.node_count();
4248        assert_eq!(
4249            count_after, 1,
4250            "Rollback should discard node created via session.create_node(), but got {count_after}"
4251        );
4252    }
4253
4254    #[test]
4255    fn test_session_create_node_with_props_in_transaction() {
4256        use grafeo_common::types::Value;
4257
4258        // Test that session.create_node_with_props() is transaction-aware
4259        let db = GrafeoDB::new_in_memory();
4260
4261        // Create a node outside of any transaction
4262        db.create_node(&["Person"]);
4263        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4264
4265        // Start a transaction and create a node with properties
4266        let mut session = db.session();
4267        session.begin_transaction().unwrap();
4268        let transaction_id = session.current_transaction.lock().unwrap();
4269
4270        let node_in_tx =
4271            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4272        assert!(node_in_tx.is_valid());
4273
4274        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4275        // non-versioned lookups. Verify the node is visible only to its own tx.
4276        assert_eq!(
4277            db.node_count(),
4278            1,
4279            "PENDING nodes should be invisible to non-versioned node_count()"
4280        );
4281        let epoch = db.store().current_epoch();
4282        assert!(
4283            db.store()
4284                .get_node_versioned(node_in_tx, epoch, transaction_id)
4285                .is_some(),
4286            "Transaction node should be visible to its own transaction"
4287        );
4288
4289        // Rollback the transaction
4290        session.rollback().unwrap();
4291
4292        // The node should be discarded
4293        let count_after = db.node_count();
4294        assert_eq!(
4295            count_after, 1,
4296            "Rollback should discard node created via session.create_node_with_props()"
4297        );
4298    }
4299
4300    #[cfg(feature = "gql")]
4301    mod gql_tests {
4302        use super::*;
4303
4304        #[test]
4305        fn test_gql_query_execution() {
4306            let db = GrafeoDB::new_in_memory();
4307            let session = db.session();
4308
4309            // Create some test data
4310            session.create_node(&["Person"]);
4311            session.create_node(&["Person"]);
4312            session.create_node(&["Animal"]);
4313
4314            // Execute a GQL query
4315            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4316
4317            // Should return 2 Person nodes
4318            assert_eq!(result.row_count(), 2);
4319            assert_eq!(result.column_count(), 1);
4320            assert_eq!(result.columns[0], "n");
4321        }
4322
4323        #[test]
4324        fn test_gql_empty_result() {
4325            let db = GrafeoDB::new_in_memory();
4326            let session = db.session();
4327
4328            // No data in database
4329            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4330
4331            assert_eq!(result.row_count(), 0);
4332        }
4333
4334        #[test]
4335        fn test_gql_parse_error() {
4336            let db = GrafeoDB::new_in_memory();
4337            let session = db.session();
4338
4339            // Invalid GQL syntax
4340            let result = session.execute("MATCH (n RETURN n");
4341
4342            assert!(result.is_err());
4343        }
4344
4345        #[test]
4346        fn test_gql_relationship_traversal() {
4347            let db = GrafeoDB::new_in_memory();
4348            let session = db.session();
4349
4350            // Create a graph: Alix -> Gus, Alix -> Vincent
4351            let alix = session.create_node(&["Person"]);
4352            let gus = session.create_node(&["Person"]);
4353            let vincent = session.create_node(&["Person"]);
4354
4355            session.create_edge(alix, gus, "KNOWS");
4356            session.create_edge(alix, vincent, "KNOWS");
4357
4358            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4359            let result = session
4360                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4361                .unwrap();
4362
4363            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4364            assert_eq!(result.row_count(), 2);
4365            assert_eq!(result.column_count(), 2);
4366            assert_eq!(result.columns[0], "a");
4367            assert_eq!(result.columns[1], "b");
4368        }
4369
4370        #[test]
4371        fn test_gql_relationship_with_type_filter() {
4372            let db = GrafeoDB::new_in_memory();
4373            let session = db.session();
4374
4375            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4376            let alix = session.create_node(&["Person"]);
4377            let gus = session.create_node(&["Person"]);
4378            let vincent = session.create_node(&["Person"]);
4379
4380            session.create_edge(alix, gus, "KNOWS");
4381            session.create_edge(alix, vincent, "WORKS_WITH");
4382
4383            // Query only KNOWS relationships
4384            let result = session
4385                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4386                .unwrap();
4387
4388            // Should return only 1 row (Alix->Gus)
4389            assert_eq!(result.row_count(), 1);
4390        }
4391
4392        #[test]
4393        fn test_gql_semantic_error_undefined_variable() {
4394            let db = GrafeoDB::new_in_memory();
4395            let session = db.session();
4396
4397            // Reference undefined variable 'x' in RETURN
4398            let result = session.execute("MATCH (n:Person) RETURN x");
4399
4400            // Should fail with semantic error
4401            assert!(result.is_err());
4402            let Err(err) = result else {
4403                panic!("Expected error")
4404            };
4405            assert!(
4406                err.to_string().contains("Undefined variable"),
4407                "Expected undefined variable error, got: {}",
4408                err
4409            );
4410        }
4411
4412        #[test]
4413        fn test_gql_where_clause_property_filter() {
4414            use grafeo_common::types::Value;
4415
4416            let db = GrafeoDB::new_in_memory();
4417            let session = db.session();
4418
4419            // Create people with ages
4420            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4421            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4422            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4423
4424            // Query with WHERE clause: age > 30
4425            let result = session
4426                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4427                .unwrap();
4428
4429            // Should return 2 people (ages 35 and 45)
4430            assert_eq!(result.row_count(), 2);
4431        }
4432
4433        #[test]
4434        fn test_gql_where_clause_equality() {
4435            use grafeo_common::types::Value;
4436
4437            let db = GrafeoDB::new_in_memory();
4438            let session = db.session();
4439
4440            // Create people with names
4441            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4442            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4443            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4444
4445            // Query with WHERE clause: name = "Alix"
4446            let result = session
4447                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4448                .unwrap();
4449
4450            // Should return 2 people named Alix
4451            assert_eq!(result.row_count(), 2);
4452        }
4453
4454        #[test]
4455        fn test_gql_return_property_access() {
4456            use grafeo_common::types::Value;
4457
4458            let db = GrafeoDB::new_in_memory();
4459            let session = db.session();
4460
4461            // Create people with names and ages
4462            session.create_node_with_props(
4463                &["Person"],
4464                [
4465                    ("name", Value::String("Alix".into())),
4466                    ("age", Value::Int64(30)),
4467                ],
4468            );
4469            session.create_node_with_props(
4470                &["Person"],
4471                [
4472                    ("name", Value::String("Gus".into())),
4473                    ("age", Value::Int64(25)),
4474                ],
4475            );
4476
4477            // Query returning properties
4478            let result = session
4479                .execute("MATCH (n:Person) RETURN n.name, n.age")
4480                .unwrap();
4481
4482            // Should return 2 rows with name and age columns
4483            assert_eq!(result.row_count(), 2);
4484            assert_eq!(result.column_count(), 2);
4485            assert_eq!(result.columns[0], "n.name");
4486            assert_eq!(result.columns[1], "n.age");
4487
4488            // Check that we get actual values
4489            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4490            assert!(names.contains(&&Value::String("Alix".into())));
4491            assert!(names.contains(&&Value::String("Gus".into())));
4492        }
4493
4494        #[test]
4495        fn test_gql_return_mixed_expressions() {
4496            use grafeo_common::types::Value;
4497
4498            let db = GrafeoDB::new_in_memory();
4499            let session = db.session();
4500
4501            // Create a person
4502            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4503
4504            // Query returning both node and property
4505            let result = session
4506                .execute("MATCH (n:Person) RETURN n, n.name")
4507                .unwrap();
4508
4509            assert_eq!(result.row_count(), 1);
4510            assert_eq!(result.column_count(), 2);
4511            assert_eq!(result.columns[0], "n");
4512            assert_eq!(result.columns[1], "n.name");
4513
4514            // Second column should be the name
4515            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4516        }
4517    }
4518
4519    #[cfg(feature = "cypher")]
4520    mod cypher_tests {
4521        use super::*;
4522
4523        #[test]
4524        fn test_cypher_query_execution() {
4525            let db = GrafeoDB::new_in_memory();
4526            let session = db.session();
4527
4528            // Create some test data
4529            session.create_node(&["Person"]);
4530            session.create_node(&["Person"]);
4531            session.create_node(&["Animal"]);
4532
4533            // Execute a Cypher query
4534            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4535
4536            // Should return 2 Person nodes
4537            assert_eq!(result.row_count(), 2);
4538            assert_eq!(result.column_count(), 1);
4539            assert_eq!(result.columns[0], "n");
4540        }
4541
4542        #[test]
4543        fn test_cypher_empty_result() {
4544            let db = GrafeoDB::new_in_memory();
4545            let session = db.session();
4546
4547            // No data in database
4548            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4549
4550            assert_eq!(result.row_count(), 0);
4551        }
4552
4553        #[test]
4554        fn test_cypher_parse_error() {
4555            let db = GrafeoDB::new_in_memory();
4556            let session = db.session();
4557
4558            // Invalid Cypher syntax
4559            let result = session.execute_cypher("MATCH (n RETURN n");
4560
4561            assert!(result.is_err());
4562        }
4563    }
4564
4565    // ==================== Direct Lookup API Tests ====================
4566
4567    mod direct_lookup_tests {
4568        use super::*;
4569        use grafeo_common::types::Value;
4570
4571        #[test]
4572        fn test_get_node() {
4573            let db = GrafeoDB::new_in_memory();
4574            let session = db.session();
4575
4576            let id = session.create_node(&["Person"]);
4577            let node = session.get_node(id);
4578
4579            assert!(node.is_some());
4580            let node = node.unwrap();
4581            assert_eq!(node.id, id);
4582        }
4583
4584        #[test]
4585        fn test_get_node_not_found() {
4586            use grafeo_common::types::NodeId;
4587
4588            let db = GrafeoDB::new_in_memory();
4589            let session = db.session();
4590
4591            // Try to get a non-existent node
4592            let node = session.get_node(NodeId::new(9999));
4593            assert!(node.is_none());
4594        }
4595
4596        #[test]
4597        fn test_get_node_property() {
4598            let db = GrafeoDB::new_in_memory();
4599            let session = db.session();
4600
4601            let id = session
4602                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4603
4604            let name = session.get_node_property(id, "name");
4605            assert_eq!(name, Some(Value::String("Alix".into())));
4606
4607            // Non-existent property
4608            let missing = session.get_node_property(id, "missing");
4609            assert!(missing.is_none());
4610        }
4611
4612        #[test]
4613        fn test_get_edge() {
4614            let db = GrafeoDB::new_in_memory();
4615            let session = db.session();
4616
4617            let alix = session.create_node(&["Person"]);
4618            let gus = session.create_node(&["Person"]);
4619            let edge_id = session.create_edge(alix, gus, "KNOWS");
4620
4621            let edge = session.get_edge(edge_id);
4622            assert!(edge.is_some());
4623            let edge = edge.unwrap();
4624            assert_eq!(edge.id, edge_id);
4625            assert_eq!(edge.src, alix);
4626            assert_eq!(edge.dst, gus);
4627        }
4628
4629        #[test]
4630        fn test_get_edge_not_found() {
4631            use grafeo_common::types::EdgeId;
4632
4633            let db = GrafeoDB::new_in_memory();
4634            let session = db.session();
4635
4636            let edge = session.get_edge(EdgeId::new(9999));
4637            assert!(edge.is_none());
4638        }
4639
4640        #[test]
4641        fn test_get_neighbors_outgoing() {
4642            let db = GrafeoDB::new_in_memory();
4643            let session = db.session();
4644
4645            let alix = session.create_node(&["Person"]);
4646            let gus = session.create_node(&["Person"]);
4647            let harm = session.create_node(&["Person"]);
4648
4649            session.create_edge(alix, gus, "KNOWS");
4650            session.create_edge(alix, harm, "KNOWS");
4651
4652            let neighbors = session.get_neighbors_outgoing(alix);
4653            assert_eq!(neighbors.len(), 2);
4654
4655            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4656            assert!(neighbor_ids.contains(&gus));
4657            assert!(neighbor_ids.contains(&harm));
4658        }
4659
4660        #[test]
4661        fn test_get_neighbors_incoming() {
4662            let db = GrafeoDB::new_in_memory();
4663            let session = db.session();
4664
4665            let alix = session.create_node(&["Person"]);
4666            let gus = session.create_node(&["Person"]);
4667            let harm = session.create_node(&["Person"]);
4668
4669            session.create_edge(gus, alix, "KNOWS");
4670            session.create_edge(harm, alix, "KNOWS");
4671
4672            let neighbors = session.get_neighbors_incoming(alix);
4673            assert_eq!(neighbors.len(), 2);
4674
4675            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4676            assert!(neighbor_ids.contains(&gus));
4677            assert!(neighbor_ids.contains(&harm));
4678        }
4679
4680        #[test]
4681        fn test_get_neighbors_outgoing_by_type() {
4682            let db = GrafeoDB::new_in_memory();
4683            let session = db.session();
4684
4685            let alix = session.create_node(&["Person"]);
4686            let gus = session.create_node(&["Person"]);
4687            let company = session.create_node(&["Company"]);
4688
4689            session.create_edge(alix, gus, "KNOWS");
4690            session.create_edge(alix, company, "WORKS_AT");
4691
4692            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4693            assert_eq!(knows_neighbors.len(), 1);
4694            assert_eq!(knows_neighbors[0].0, gus);
4695
4696            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4697            assert_eq!(works_neighbors.len(), 1);
4698            assert_eq!(works_neighbors[0].0, company);
4699
4700            // No edges of this type
4701            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4702            assert!(no_neighbors.is_empty());
4703        }
4704
4705        #[test]
4706        fn test_node_exists() {
4707            use grafeo_common::types::NodeId;
4708
4709            let db = GrafeoDB::new_in_memory();
4710            let session = db.session();
4711
4712            let id = session.create_node(&["Person"]);
4713
4714            assert!(session.node_exists(id));
4715            assert!(!session.node_exists(NodeId::new(9999)));
4716        }
4717
4718        #[test]
4719        fn test_edge_exists() {
4720            use grafeo_common::types::EdgeId;
4721
4722            let db = GrafeoDB::new_in_memory();
4723            let session = db.session();
4724
4725            let alix = session.create_node(&["Person"]);
4726            let gus = session.create_node(&["Person"]);
4727            let edge_id = session.create_edge(alix, gus, "KNOWS");
4728
4729            assert!(session.edge_exists(edge_id));
4730            assert!(!session.edge_exists(EdgeId::new(9999)));
4731        }
4732
4733        #[test]
4734        fn test_get_degree() {
4735            let db = GrafeoDB::new_in_memory();
4736            let session = db.session();
4737
4738            let alix = session.create_node(&["Person"]);
4739            let gus = session.create_node(&["Person"]);
4740            let harm = session.create_node(&["Person"]);
4741
4742            // Alix knows Gus and Harm (2 outgoing)
4743            session.create_edge(alix, gus, "KNOWS");
4744            session.create_edge(alix, harm, "KNOWS");
4745            // Gus knows Alix (1 incoming for Alix)
4746            session.create_edge(gus, alix, "KNOWS");
4747
4748            let (out_degree, in_degree) = session.get_degree(alix);
4749            assert_eq!(out_degree, 2);
4750            assert_eq!(in_degree, 1);
4751
4752            // Node with no edges
4753            let lonely = session.create_node(&["Person"]);
4754            let (out, in_deg) = session.get_degree(lonely);
4755            assert_eq!(out, 0);
4756            assert_eq!(in_deg, 0);
4757        }
4758
4759        #[test]
4760        fn test_get_nodes_batch() {
4761            let db = GrafeoDB::new_in_memory();
4762            let session = db.session();
4763
4764            let alix = session.create_node(&["Person"]);
4765            let gus = session.create_node(&["Person"]);
4766            let harm = session.create_node(&["Person"]);
4767
4768            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4769            assert_eq!(nodes.len(), 3);
4770            assert!(nodes[0].is_some());
4771            assert!(nodes[1].is_some());
4772            assert!(nodes[2].is_some());
4773
4774            // With non-existent node
4775            use grafeo_common::types::NodeId;
4776            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4777            assert_eq!(nodes_with_missing.len(), 3);
4778            assert!(nodes_with_missing[0].is_some());
4779            assert!(nodes_with_missing[1].is_none()); // Missing node
4780            assert!(nodes_with_missing[2].is_some());
4781        }
4782
4783        #[test]
4784        fn test_auto_commit_setting() {
4785            let db = GrafeoDB::new_in_memory();
4786            let mut session = db.session();
4787
4788            // Default is auto-commit enabled
4789            assert!(session.auto_commit());
4790
4791            session.set_auto_commit(false);
4792            assert!(!session.auto_commit());
4793
4794            session.set_auto_commit(true);
4795            assert!(session.auto_commit());
4796        }
4797
4798        #[test]
4799        fn test_transaction_double_begin_nests() {
4800            let db = GrafeoDB::new_in_memory();
4801            let mut session = db.session();
4802
4803            session.begin_transaction().unwrap();
4804            // Second begin_transaction creates a nested transaction (auto-savepoint)
4805            let result = session.begin_transaction();
4806            assert!(result.is_ok());
4807            // Commit the inner (releases savepoint)
4808            session.commit().unwrap();
4809            // Commit the outer
4810            session.commit().unwrap();
4811        }
4812
4813        #[test]
4814        fn test_commit_without_transaction_error() {
4815            let db = GrafeoDB::new_in_memory();
4816            let mut session = db.session();
4817
4818            let result = session.commit();
4819            assert!(result.is_err());
4820        }
4821
4822        #[test]
4823        fn test_rollback_without_transaction_error() {
4824            let db = GrafeoDB::new_in_memory();
4825            let mut session = db.session();
4826
4827            let result = session.rollback();
4828            assert!(result.is_err());
4829        }
4830
4831        #[test]
4832        fn test_create_edge_in_transaction() {
4833            let db = GrafeoDB::new_in_memory();
4834            let mut session = db.session();
4835
4836            // Create nodes outside transaction
4837            let alix = session.create_node(&["Person"]);
4838            let gus = session.create_node(&["Person"]);
4839
4840            // Create edge in transaction
4841            session.begin_transaction().unwrap();
4842            let edge_id = session.create_edge(alix, gus, "KNOWS");
4843
4844            // Edge should be visible in the transaction
4845            assert!(session.edge_exists(edge_id));
4846
4847            // Commit
4848            session.commit().unwrap();
4849
4850            // Edge should still be visible
4851            assert!(session.edge_exists(edge_id));
4852        }
4853
4854        #[test]
4855        fn test_neighbors_empty_node() {
4856            let db = GrafeoDB::new_in_memory();
4857            let session = db.session();
4858
4859            let lonely = session.create_node(&["Person"]);
4860
4861            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4862            assert!(session.get_neighbors_incoming(lonely).is_empty());
4863            assert!(
4864                session
4865                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4866                    .is_empty()
4867            );
4868        }
4869    }
4870
4871    #[test]
4872    fn test_auto_gc_triggers_on_commit_interval() {
4873        use crate::config::Config;
4874
4875        let config = Config::in_memory().with_gc_interval(2);
4876        let db = GrafeoDB::with_config(config).unwrap();
4877        let mut session = db.session();
4878
4879        // First commit: counter = 1, no GC (not a multiple of 2)
4880        session.begin_transaction().unwrap();
4881        session.create_node(&["A"]);
4882        session.commit().unwrap();
4883
4884        // Second commit: counter = 2, GC should trigger (multiple of 2)
4885        session.begin_transaction().unwrap();
4886        session.create_node(&["B"]);
4887        session.commit().unwrap();
4888
4889        // Verify the database is still functional after GC
4890        assert_eq!(db.node_count(), 2);
4891    }
4892
4893    #[test]
4894    fn test_query_timeout_config_propagates_to_session() {
4895        use crate::config::Config;
4896        use std::time::Duration;
4897
4898        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4899        let db = GrafeoDB::with_config(config).unwrap();
4900        let session = db.session();
4901
4902        // Verify the session has a query deadline (timeout was set)
4903        assert!(session.query_deadline().is_some());
4904    }
4905
4906    #[test]
4907    fn test_no_query_timeout_returns_no_deadline() {
4908        let db = GrafeoDB::new_in_memory();
4909        let session = db.session();
4910
4911        // Default config has no timeout
4912        assert!(session.query_deadline().is_none());
4913    }
4914
4915    #[test]
4916    fn test_graph_model_accessor() {
4917        use crate::config::GraphModel;
4918
4919        let db = GrafeoDB::new_in_memory();
4920        let session = db.session();
4921
4922        assert_eq!(session.graph_model(), GraphModel::Lpg);
4923    }
4924
4925    #[cfg(feature = "gql")]
4926    #[test]
4927    fn test_external_store_session() {
4928        use grafeo_core::graph::GraphStoreMut;
4929        use std::sync::Arc;
4930
4931        let config = crate::config::Config::in_memory();
4932        let store =
4933            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4934        let db = GrafeoDB::with_store(store, config).unwrap();
4935
4936        let mut session = db.session();
4937
4938        // Use an explicit transaction so that INSERT and MATCH share the same
4939        // transaction context. With PENDING epochs, uncommitted versions are
4940        // only visible to the owning transaction.
4941        session.begin_transaction().unwrap();
4942        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4943
4944        // Verify we can query through it within the same transaction
4945        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4946        assert_eq!(result.row_count(), 1);
4947
4948        session.commit().unwrap();
4949    }
4950
4951    // ==================== Session Command Tests ====================
4952
4953    #[cfg(feature = "gql")]
4954    mod session_command_tests {
4955        use super::*;
4956        use grafeo_common::types::Value;
4957
4958        #[test]
4959        fn test_use_graph_sets_current_graph() {
4960            let db = GrafeoDB::new_in_memory();
4961            let session = db.session();
4962
4963            // Create the graph first, then USE it
4964            session.execute("CREATE GRAPH mydb").unwrap();
4965            session.execute("USE GRAPH mydb").unwrap();
4966
4967            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4968        }
4969
4970        #[test]
4971        fn test_use_graph_nonexistent_errors() {
4972            let db = GrafeoDB::new_in_memory();
4973            let session = db.session();
4974
4975            let result = session.execute("USE GRAPH doesnotexist");
4976            assert!(result.is_err());
4977            let err = result.unwrap_err().to_string();
4978            assert!(
4979                err.contains("does not exist"),
4980                "Expected 'does not exist' error, got: {err}"
4981            );
4982        }
4983
4984        #[test]
4985        fn test_use_graph_default_always_valid() {
4986            let db = GrafeoDB::new_in_memory();
4987            let session = db.session();
4988
4989            // "default" is always valid, even without CREATE GRAPH
4990            session.execute("USE GRAPH default").unwrap();
4991            assert_eq!(session.current_graph(), Some("default".to_string()));
4992        }
4993
4994        #[test]
4995        fn test_session_set_graph() {
4996            let db = GrafeoDB::new_in_memory();
4997            let session = db.session();
4998
4999            session.execute("CREATE GRAPH analytics").unwrap();
5000            session.execute("SESSION SET GRAPH analytics").unwrap();
5001            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5002        }
5003
5004        #[test]
5005        fn test_session_set_graph_nonexistent_errors() {
5006            let db = GrafeoDB::new_in_memory();
5007            let session = db.session();
5008
5009            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5010            assert!(result.is_err());
5011        }
5012
5013        #[test]
5014        fn test_session_set_time_zone() {
5015            let db = GrafeoDB::new_in_memory();
5016            let session = db.session();
5017
5018            assert_eq!(session.time_zone(), None);
5019
5020            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5021            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5022
5023            session
5024                .execute("SESSION SET TIME ZONE 'America/New_York'")
5025                .unwrap();
5026            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5027        }
5028
5029        #[test]
5030        fn test_session_set_parameter() {
5031            let db = GrafeoDB::new_in_memory();
5032            let session = db.session();
5033
5034            session
5035                .execute("SESSION SET PARAMETER $timeout = 30")
5036                .unwrap();
5037
5038            // Parameter is stored (value is Null for now, since expression
5039            // evaluation is not yet wired up)
5040            assert!(session.get_parameter("timeout").is_some());
5041        }
5042
5043        #[test]
5044        fn test_session_reset_clears_all_state() {
5045            let db = GrafeoDB::new_in_memory();
5046            let session = db.session();
5047
5048            // Set various session state
5049            session.execute("CREATE GRAPH analytics").unwrap();
5050            session.execute("SESSION SET GRAPH analytics").unwrap();
5051            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5052            session
5053                .execute("SESSION SET PARAMETER $limit = 100")
5054                .unwrap();
5055
5056            // Verify state was set
5057            assert!(session.current_graph().is_some());
5058            assert!(session.time_zone().is_some());
5059            assert!(session.get_parameter("limit").is_some());
5060
5061            // Reset everything
5062            session.execute("SESSION RESET").unwrap();
5063
5064            assert_eq!(session.current_graph(), None);
5065            assert_eq!(session.time_zone(), None);
5066            assert!(session.get_parameter("limit").is_none());
5067        }
5068
5069        #[test]
5070        fn test_session_close_clears_state() {
5071            let db = GrafeoDB::new_in_memory();
5072            let session = db.session();
5073
5074            session.execute("CREATE GRAPH analytics").unwrap();
5075            session.execute("SESSION SET GRAPH analytics").unwrap();
5076            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5077
5078            session.execute("SESSION CLOSE").unwrap();
5079
5080            assert_eq!(session.current_graph(), None);
5081            assert_eq!(session.time_zone(), None);
5082        }
5083
5084        #[test]
5085        fn test_create_graph() {
5086            let db = GrafeoDB::new_in_memory();
5087            let session = db.session();
5088
5089            session.execute("CREATE GRAPH mydb").unwrap();
5090
5091            // Should be able to USE it now
5092            session.execute("USE GRAPH mydb").unwrap();
5093            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5094        }
5095
5096        #[test]
5097        fn test_create_graph_duplicate_errors() {
5098            let db = GrafeoDB::new_in_memory();
5099            let session = db.session();
5100
5101            session.execute("CREATE GRAPH mydb").unwrap();
5102            let result = session.execute("CREATE GRAPH mydb");
5103
5104            assert!(result.is_err());
5105            let err = result.unwrap_err().to_string();
5106            assert!(
5107                err.contains("already exists"),
5108                "Expected 'already exists' error, got: {err}"
5109            );
5110        }
5111
5112        #[test]
5113        fn test_create_graph_if_not_exists() {
5114            let db = GrafeoDB::new_in_memory();
5115            let session = db.session();
5116
5117            session.execute("CREATE GRAPH mydb").unwrap();
5118            // Should succeed silently with IF NOT EXISTS
5119            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5120        }
5121
5122        #[test]
5123        fn test_drop_graph() {
5124            let db = GrafeoDB::new_in_memory();
5125            let session = db.session();
5126
5127            session.execute("CREATE GRAPH mydb").unwrap();
5128            session.execute("DROP GRAPH mydb").unwrap();
5129
5130            // Should no longer be usable
5131            let result = session.execute("USE GRAPH mydb");
5132            assert!(result.is_err());
5133        }
5134
5135        #[test]
5136        fn test_drop_graph_nonexistent_errors() {
5137            let db = GrafeoDB::new_in_memory();
5138            let session = db.session();
5139
5140            let result = session.execute("DROP GRAPH nosuchgraph");
5141            assert!(result.is_err());
5142            let err = result.unwrap_err().to_string();
5143            assert!(
5144                err.contains("does not exist"),
5145                "Expected 'does not exist' error, got: {err}"
5146            );
5147        }
5148
5149        #[test]
5150        fn test_drop_graph_if_exists() {
5151            let db = GrafeoDB::new_in_memory();
5152            let session = db.session();
5153
5154            // Should succeed silently with IF EXISTS
5155            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5156        }
5157
5158        #[test]
5159        fn test_start_transaction_via_gql() {
5160            let db = GrafeoDB::new_in_memory();
5161            let session = db.session();
5162
5163            session.execute("START TRANSACTION").unwrap();
5164            assert!(session.in_transaction());
5165            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5166            session.execute("COMMIT").unwrap();
5167            assert!(!session.in_transaction());
5168
5169            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5170            assert_eq!(result.rows.len(), 1);
5171        }
5172
5173        #[test]
5174        fn test_start_transaction_read_only_blocks_insert() {
5175            let db = GrafeoDB::new_in_memory();
5176            let session = db.session();
5177
5178            session.execute("START TRANSACTION READ ONLY").unwrap();
5179            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5180            assert!(result.is_err());
5181            let err = result.unwrap_err().to_string();
5182            assert!(
5183                err.contains("read-only"),
5184                "Expected read-only error, got: {err}"
5185            );
5186            session.execute("ROLLBACK").unwrap();
5187        }
5188
5189        #[test]
5190        fn test_start_transaction_read_only_allows_reads() {
5191            let db = GrafeoDB::new_in_memory();
5192            let mut session = db.session();
5193            session.begin_transaction().unwrap();
5194            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5195            session.commit().unwrap();
5196
5197            session.execute("START TRANSACTION READ ONLY").unwrap();
5198            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5199            assert_eq!(result.rows.len(), 1);
5200            session.execute("COMMIT").unwrap();
5201        }
5202
5203        #[test]
5204        fn test_rollback_via_gql() {
5205            let db = GrafeoDB::new_in_memory();
5206            let session = db.session();
5207
5208            session.execute("START TRANSACTION").unwrap();
5209            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5210            session.execute("ROLLBACK").unwrap();
5211
5212            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5213            assert!(result.rows.is_empty());
5214        }
5215
5216        #[test]
5217        fn test_start_transaction_with_isolation_level() {
5218            let db = GrafeoDB::new_in_memory();
5219            let session = db.session();
5220
5221            session
5222                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5223                .unwrap();
5224            assert!(session.in_transaction());
5225            session.execute("ROLLBACK").unwrap();
5226        }
5227
5228        #[test]
5229        fn test_session_commands_return_empty_result() {
5230            let db = GrafeoDB::new_in_memory();
5231            let session = db.session();
5232
5233            session.execute("CREATE GRAPH test").unwrap();
5234            let result = session.execute("SESSION SET GRAPH test").unwrap();
5235            assert_eq!(result.row_count(), 0);
5236            assert_eq!(result.column_count(), 0);
5237        }
5238
5239        #[test]
5240        fn test_current_graph_default_is_none() {
5241            let db = GrafeoDB::new_in_memory();
5242            let session = db.session();
5243
5244            assert_eq!(session.current_graph(), None);
5245        }
5246
5247        #[test]
5248        fn test_time_zone_default_is_none() {
5249            let db = GrafeoDB::new_in_memory();
5250            let session = db.session();
5251
5252            assert_eq!(session.time_zone(), None);
5253        }
5254
5255        #[test]
5256        fn test_session_state_independent_across_sessions() {
5257            let db = GrafeoDB::new_in_memory();
5258            let session1 = db.session();
5259            let session2 = db.session();
5260
5261            session1.execute("CREATE GRAPH first").unwrap();
5262            session1.execute("CREATE GRAPH second").unwrap();
5263            session1.execute("SESSION SET GRAPH first").unwrap();
5264            session2.execute("SESSION SET GRAPH second").unwrap();
5265
5266            assert_eq!(session1.current_graph(), Some("first".to_string()));
5267            assert_eq!(session2.current_graph(), Some("second".to_string()));
5268        }
5269
5270        #[test]
5271        fn test_show_node_types() {
5272            let db = GrafeoDB::new_in_memory();
5273            let session = db.session();
5274
5275            session
5276                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5277                .unwrap();
5278
5279            let result = session.execute("SHOW NODE TYPES").unwrap();
5280            assert_eq!(
5281                result.columns,
5282                vec!["name", "properties", "constraints", "parents"]
5283            );
5284            assert_eq!(result.rows.len(), 1);
5285            // First column is the type name
5286            assert_eq!(result.rows[0][0], Value::from("Person"));
5287        }
5288
5289        #[test]
5290        fn test_show_edge_types() {
5291            let db = GrafeoDB::new_in_memory();
5292            let session = db.session();
5293
5294            session
5295                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5296                .unwrap();
5297
5298            let result = session.execute("SHOW EDGE TYPES").unwrap();
5299            assert_eq!(
5300                result.columns,
5301                vec!["name", "properties", "source_types", "target_types"]
5302            );
5303            assert_eq!(result.rows.len(), 1);
5304            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5305        }
5306
5307        #[test]
5308        fn test_show_graph_types() {
5309            let db = GrafeoDB::new_in_memory();
5310            let session = db.session();
5311
5312            session
5313                .execute("CREATE NODE TYPE Person (name STRING)")
5314                .unwrap();
5315            session
5316                .execute(
5317                    "CREATE GRAPH TYPE social (\
5318                        NODE TYPE Person (name STRING)\
5319                    )",
5320                )
5321                .unwrap();
5322
5323            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5324            assert_eq!(
5325                result.columns,
5326                vec!["name", "open", "node_types", "edge_types"]
5327            );
5328            assert_eq!(result.rows.len(), 1);
5329            assert_eq!(result.rows[0][0], Value::from("social"));
5330        }
5331
5332        #[test]
5333        fn test_show_graph_type_named() {
5334            let db = GrafeoDB::new_in_memory();
5335            let session = db.session();
5336
5337            session
5338                .execute("CREATE NODE TYPE Person (name STRING)")
5339                .unwrap();
5340            session
5341                .execute(
5342                    "CREATE GRAPH TYPE social (\
5343                        NODE TYPE Person (name STRING)\
5344                    )",
5345                )
5346                .unwrap();
5347
5348            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5349            assert_eq!(result.rows.len(), 1);
5350            assert_eq!(result.rows[0][0], Value::from("social"));
5351        }
5352
5353        #[test]
5354        fn test_show_graph_type_not_found() {
5355            let db = GrafeoDB::new_in_memory();
5356            let session = db.session();
5357
5358            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5359            assert!(result.is_err());
5360        }
5361
5362        #[test]
5363        fn test_show_indexes_via_gql() {
5364            let db = GrafeoDB::new_in_memory();
5365            let session = db.session();
5366
5367            let result = session.execute("SHOW INDEXES").unwrap();
5368            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5369        }
5370
5371        #[test]
5372        fn test_show_constraints_via_gql() {
5373            let db = GrafeoDB::new_in_memory();
5374            let session = db.session();
5375
5376            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5377            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5378        }
5379
5380        #[test]
5381        fn test_pattern_form_graph_type_roundtrip() {
5382            let db = GrafeoDB::new_in_memory();
5383            let session = db.session();
5384
5385            // Register the types first
5386            session
5387                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5388                .unwrap();
5389            session
5390                .execute("CREATE NODE TYPE City (name STRING)")
5391                .unwrap();
5392            session
5393                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5394                .unwrap();
5395            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5396
5397            // Create graph type using pattern form
5398            session
5399                .execute(
5400                    "CREATE GRAPH TYPE social (\
5401                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5402                        (:Person)-[:LIVES_IN]->(:City)\
5403                    )",
5404                )
5405                .unwrap();
5406
5407            // Verify it was created
5408            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5409            assert_eq!(result.rows.len(), 1);
5410            assert_eq!(result.rows[0][0], Value::from("social"));
5411        }
5412    }
5413}