Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_core::graph::Direction;
17use grafeo_core::graph::GraphStoreMut;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21
22use crate::catalog::{Catalog, CatalogConstraintValidator};
23use crate::config::{AdaptiveConfig, GraphModel};
24use crate::database::QueryResult;
25use crate::query::cache::QueryCache;
26use crate::transaction::TransactionManager;
27
28/// Parses a DDL default-value literal string into a [`Value`].
29///
30/// Handles string literals (single- or double-quoted), integers, floats,
31/// booleans (`true`/`false`), and `NULL`.
32fn parse_default_literal(text: &str) -> Value {
33    if text.eq_ignore_ascii_case("null") {
34        return Value::Null;
35    }
36    if text.eq_ignore_ascii_case("true") {
37        return Value::Bool(true);
38    }
39    if text.eq_ignore_ascii_case("false") {
40        return Value::Bool(false);
41    }
42    // String literal: strip surrounding quotes
43    if (text.starts_with('\'') && text.ends_with('\''))
44        || (text.starts_with('"') && text.ends_with('"'))
45    {
46        return Value::String(text[1..text.len() - 1].into());
47    }
48    // Try integer, then float
49    if let Ok(i) = text.parse::<i64>() {
50        return Value::Int64(i);
51    }
52    if let Ok(f) = text.parse::<f64>() {
53        return Value::Float64(f);
54    }
55    // Fallback: treat as string
56    Value::String(text.into())
57}
58
59/// Runtime configuration for creating a new session.
60///
61/// Groups the shared parameters passed to all session constructors, keeping
62/// call sites readable and avoiding long argument lists.
63pub(crate) struct SessionConfig {
64    pub transaction_manager: Arc<TransactionManager>,
65    pub query_cache: Arc<QueryCache>,
66    pub catalog: Arc<Catalog>,
67    pub adaptive_config: AdaptiveConfig,
68    pub factorized_execution: bool,
69    pub graph_model: GraphModel,
70    pub query_timeout: Option<Duration>,
71    pub commit_counter: Arc<AtomicUsize>,
72    pub gc_interval: usize,
73}
74
75/// Your handle to the database - execute queries and manage transactions.
76///
77/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
78/// tracks its own transaction state, so you can have multiple concurrent
79/// sessions without them interfering.
80pub struct Session {
81    /// The underlying store.
82    store: Arc<LpgStore>,
83    /// Graph store trait object for pluggable storage backends.
84    graph_store: Arc<dyn GraphStoreMut>,
85    /// Schema and metadata catalog shared across sessions.
86    catalog: Arc<Catalog>,
87    /// RDF triple store (if RDF feature is enabled).
88    #[cfg(feature = "rdf")]
89    rdf_store: Arc<RdfStore>,
90    /// Transaction manager.
91    transaction_manager: Arc<TransactionManager>,
92    /// Query cache shared across sessions.
93    query_cache: Arc<QueryCache>,
94    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
95    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
96    /// from within `execute(&self)`.
97    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
98    /// Whether the current transaction is read-only (blocks mutations).
99    read_only_tx: parking_lot::Mutex<bool>,
100    /// Whether the session is in auto-commit mode.
101    auto_commit: bool,
102    /// Adaptive execution configuration.
103    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
104    adaptive_config: AdaptiveConfig,
105    /// Whether to use factorized execution for multi-hop queries.
106    factorized_execution: bool,
107    /// The graph data model this session operates on.
108    graph_model: GraphModel,
109    /// Maximum time a query may run before being cancelled.
110    query_timeout: Option<Duration>,
111    /// Shared commit counter for triggering auto-GC.
112    commit_counter: Arc<AtomicUsize>,
113    /// GC every N commits (0 = disabled).
114    gc_interval: usize,
115    /// Node count at the start of the current transaction (for PreparedCommit stats).
116    transaction_start_node_count: AtomicUsize,
117    /// Edge count at the start of the current transaction (for PreparedCommit stats).
118    transaction_start_edge_count: AtomicUsize,
119    /// WAL for logging schema changes.
120    #[cfg(feature = "wal")]
121    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
122    /// Shared WAL graph context tracker for named graph awareness.
123    #[cfg(feature = "wal")]
124    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
125    /// CDC log for change tracking.
126    #[cfg(feature = "cdc")]
127    cdc_log: Arc<crate::cdc::CdcLog>,
128    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
129    current_graph: parking_lot::Mutex<Option<String>>,
130    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
131    /// None = "not set" (uses default schema).
132    current_schema: parking_lot::Mutex<Option<String>>,
133    /// Session time zone override.
134    time_zone: parking_lot::Mutex<Option<String>>,
135    /// Session-level parameters (SET PARAMETER).
136    session_params:
137        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
138    /// Override epoch for time-travel queries (None = use transaction/current epoch).
139    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
140    /// Savepoints within the current transaction.
141    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
142    /// Nesting depth for nested transactions (0 = outermost).
143    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
144    /// releases it, nested `ROLLBACK` rolls back to it.
145    transaction_nesting_depth: parking_lot::Mutex<u32>,
146    /// Named graphs touched during the current transaction (for cross-graph atomicity).
147    /// `None` represents the default graph. Populated at `BEGIN` time and on each
148    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
149    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
150    /// Shared metrics registry (populated when the `metrics` feature is enabled).
151    #[cfg(feature = "metrics")]
152    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
153    /// Transaction start time for duration tracking.
154    #[cfg(feature = "metrics")]
155    tx_start_time: parking_lot::Mutex<Option<Instant>>,
156}
157
158/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
159#[derive(Clone)]
160struct GraphSavepoint {
161    graph_name: Option<String>,
162    next_node_id: u64,
163    next_edge_id: u64,
164    undo_log_position: usize,
165}
166
167/// Savepoint state: name + per-graph snapshots + the graph that was active.
168#[derive(Clone)]
169struct SavepointState {
170    name: String,
171    graph_snapshots: Vec<GraphSavepoint>,
172    /// The graph that was active when the savepoint was created.
173    /// Reserved for future use (e.g., restoring graph context on rollback).
174    #[allow(dead_code)]
175    active_graph: Option<String>,
176}
177
178impl Session {
179    /// Creates a new session with adaptive execution configuration.
180    #[allow(dead_code)]
181    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
182        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
183        Self {
184            store,
185            graph_store,
186            catalog: cfg.catalog,
187            #[cfg(feature = "rdf")]
188            rdf_store: Arc::new(RdfStore::new()),
189            transaction_manager: cfg.transaction_manager,
190            query_cache: cfg.query_cache,
191            current_transaction: parking_lot::Mutex::new(None),
192            read_only_tx: parking_lot::Mutex::new(false),
193            auto_commit: true,
194            adaptive_config: cfg.adaptive_config,
195            factorized_execution: cfg.factorized_execution,
196            graph_model: cfg.graph_model,
197            query_timeout: cfg.query_timeout,
198            commit_counter: cfg.commit_counter,
199            gc_interval: cfg.gc_interval,
200            transaction_start_node_count: AtomicUsize::new(0),
201            transaction_start_edge_count: AtomicUsize::new(0),
202            #[cfg(feature = "wal")]
203            wal: None,
204            #[cfg(feature = "wal")]
205            wal_graph_context: None,
206            #[cfg(feature = "cdc")]
207            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
208            current_graph: parking_lot::Mutex::new(None),
209            current_schema: parking_lot::Mutex::new(None),
210            time_zone: parking_lot::Mutex::new(None),
211            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
212            viewing_epoch_override: parking_lot::Mutex::new(None),
213            savepoints: parking_lot::Mutex::new(Vec::new()),
214            transaction_nesting_depth: parking_lot::Mutex::new(0),
215            touched_graphs: parking_lot::Mutex::new(Vec::new()),
216            #[cfg(feature = "metrics")]
217            metrics: None,
218            #[cfg(feature = "metrics")]
219            tx_start_time: parking_lot::Mutex::new(None),
220        }
221    }
222
223    /// Sets the WAL for this session (shared with the database).
224    ///
225    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
226    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
227    #[cfg(feature = "wal")]
228    pub(crate) fn set_wal(
229        &mut self,
230        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
231        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
232    ) {
233        // Wrap the graph store so query-engine mutations are WAL-logged
234        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
235            Arc::clone(&self.store),
236            Arc::clone(&wal),
237            Arc::clone(&wal_graph_context),
238        ));
239        self.wal = Some(wal);
240        self.wal_graph_context = Some(wal_graph_context);
241    }
242
243    /// Sets the CDC log for this session (shared with the database).
244    #[cfg(feature = "cdc")]
245    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
246        self.cdc_log = cdc_log;
247    }
248
249    /// Sets the metrics registry for this session (shared with the database).
250    #[cfg(feature = "metrics")]
251    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
252        self.metrics = Some(metrics);
253    }
254
255    /// Creates a session backed by an external graph store.
256    ///
257    /// The external store handles all data operations. Transaction management
258    /// (begin/commit/rollback) is not supported for external stores.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the internal arena allocation fails (out of memory).
263    pub(crate) fn with_external_store(
264        store: Arc<dyn GraphStoreMut>,
265        cfg: SessionConfig,
266    ) -> Result<Self> {
267        Ok(Self {
268            store: Arc::new(LpgStore::new()?),
269            graph_store: store,
270            catalog: cfg.catalog,
271            #[cfg(feature = "rdf")]
272            rdf_store: Arc::new(RdfStore::new()),
273            transaction_manager: cfg.transaction_manager,
274            query_cache: cfg.query_cache,
275            current_transaction: parking_lot::Mutex::new(None),
276            read_only_tx: parking_lot::Mutex::new(false),
277            auto_commit: true,
278            adaptive_config: cfg.adaptive_config,
279            factorized_execution: cfg.factorized_execution,
280            graph_model: cfg.graph_model,
281            query_timeout: cfg.query_timeout,
282            commit_counter: cfg.commit_counter,
283            gc_interval: cfg.gc_interval,
284            transaction_start_node_count: AtomicUsize::new(0),
285            transaction_start_edge_count: AtomicUsize::new(0),
286            #[cfg(feature = "wal")]
287            wal: None,
288            #[cfg(feature = "wal")]
289            wal_graph_context: None,
290            #[cfg(feature = "cdc")]
291            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
292            current_graph: parking_lot::Mutex::new(None),
293            current_schema: parking_lot::Mutex::new(None),
294            time_zone: parking_lot::Mutex::new(None),
295            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
296            viewing_epoch_override: parking_lot::Mutex::new(None),
297            savepoints: parking_lot::Mutex::new(Vec::new()),
298            transaction_nesting_depth: parking_lot::Mutex::new(0),
299            touched_graphs: parking_lot::Mutex::new(Vec::new()),
300            #[cfg(feature = "metrics")]
301            metrics: None,
302            #[cfg(feature = "metrics")]
303            tx_start_time: parking_lot::Mutex::new(None),
304        })
305    }
306
307    /// Returns the graph model this session operates on.
308    #[must_use]
309    pub fn graph_model(&self) -> GraphModel {
310        self.graph_model
311    }
312
313    // === Session State Management ===
314
315    /// Sets the current graph for this session (USE GRAPH).
316    pub fn use_graph(&self, name: &str) {
317        *self.current_graph.lock() = Some(name.to_string());
318    }
319
320    /// Returns the current graph name, if any.
321    #[must_use]
322    pub fn current_graph(&self) -> Option<String> {
323        self.current_graph.lock().clone()
324    }
325
326    /// Sets the current schema for this session (SESSION SET SCHEMA).
327    ///
328    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
329    pub fn set_schema(&self, name: &str) {
330        *self.current_schema.lock() = Some(name.to_string());
331    }
332
333    /// Returns the current schema name, if any.
334    ///
335    /// `None` means "not set", which resolves to the default schema.
336    #[must_use]
337    pub fn current_schema(&self) -> Option<String> {
338        self.current_schema.lock().clone()
339    }
340
341    /// Computes the effective storage key for a graph, accounting for schema context.
342    ///
343    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
344    /// Uses `/` as separator since it is invalid in GQL identifiers.
345    fn effective_graph_key(&self, graph_name: &str) -> String {
346        let schema = self.current_schema.lock().clone();
347        match schema {
348            Some(s) => format!("{s}/{graph_name}"),
349            None => graph_name.to_string(),
350        }
351    }
352
353    /// Returns the effective storage key for the current graph, accounting for schema.
354    ///
355    /// Combines `current_schema` and `current_graph` into a flat lookup key.
356    fn active_graph_storage_key(&self) -> Option<String> {
357        let graph = self.current_graph.lock().clone();
358        let schema = self.current_schema.lock().clone();
359        match (schema, graph) {
360            (_, None) => None,
361            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
362            (None, Some(name)) => Some(name),
363            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
364        }
365    }
366
367    /// Returns the graph store for the currently active graph.
368    ///
369    /// If `current_graph` is `None` or `"default"`, returns the session's
370    /// default `graph_store` (already WAL-wrapped for the default graph).
371    /// Otherwise looks up the named graph in the root store and wraps it
372    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
373    /// graph context.
374    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
375        let key = self.active_graph_storage_key();
376        match key {
377            None => Arc::clone(&self.graph_store),
378            Some(ref name) => match self.store.graph(name) {
379                Some(named_store) => {
380                    #[cfg(feature = "wal")]
381                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
382                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
383                            named_store,
384                            Arc::clone(wal),
385                            name.clone(),
386                            Arc::clone(ctx),
387                        )) as Arc<dyn GraphStoreMut>;
388                    }
389                    named_store as Arc<dyn GraphStoreMut>
390                }
391                None => Arc::clone(&self.graph_store),
392            },
393        }
394    }
395
396    /// Returns the concrete `LpgStore` for the currently active graph.
397    ///
398    /// Used by direct CRUD methods that need the concrete store type
399    /// for versioned operations.
400    fn active_lpg_store(&self) -> Arc<LpgStore> {
401        let key = self.active_graph_storage_key();
402        match key {
403            None => Arc::clone(&self.store),
404            Some(ref name) => self
405                .store
406                .graph(name)
407                .unwrap_or_else(|| Arc::clone(&self.store)),
408        }
409    }
410
411    /// Resolves a graph name to a concrete `LpgStore`.
412    /// `None` and `"default"` resolve to the session's root store.
413    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
414        match graph_name {
415            None => Arc::clone(&self.store),
416            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
417            Some(name) => self
418                .store
419                .graph(name)
420                .unwrap_or_else(|| Arc::clone(&self.store)),
421        }
422    }
423
424    /// Records the current graph as "touched" if a transaction is active.
425    ///
426    /// Uses the full storage key (schema/graph) so that commit/rollback
427    /// can resolve the correct store via `resolve_store`.
428    fn track_graph_touch(&self) {
429        if self.current_transaction.lock().is_some() {
430            let key = self.active_graph_storage_key();
431            let mut touched = self.touched_graphs.lock();
432            if !touched.contains(&key) {
433                touched.push(key);
434            }
435        }
436    }
437
438    /// Sets the session time zone.
439    pub fn set_time_zone(&self, tz: &str) {
440        *self.time_zone.lock() = Some(tz.to_string());
441    }
442
443    /// Returns the session time zone, if set.
444    #[must_use]
445    pub fn time_zone(&self) -> Option<String> {
446        self.time_zone.lock().clone()
447    }
448
449    /// Sets a session parameter.
450    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
451        self.session_params.lock().insert(key.to_string(), value);
452    }
453
454    /// Gets a session parameter by cloning it.
455    #[must_use]
456    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
457        self.session_params.lock().get(key).cloned()
458    }
459
460    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
461    pub fn reset_session(&self) {
462        *self.current_schema.lock() = None;
463        *self.current_graph.lock() = None;
464        *self.time_zone.lock() = None;
465        self.session_params.lock().clear();
466        *self.viewing_epoch_override.lock() = None;
467    }
468
469    /// Resets only the session schema (Section 7.2 GR1).
470    pub fn reset_schema(&self) {
471        *self.current_schema.lock() = None;
472    }
473
474    /// Resets only the session graph (Section 7.2 GR2).
475    pub fn reset_graph(&self) {
476        *self.current_graph.lock() = None;
477    }
478
479    /// Resets only the session time zone (Section 7.2 GR3).
480    pub fn reset_time_zone(&self) {
481        *self.time_zone.lock() = None;
482    }
483
484    /// Resets only session parameters (Section 7.2 GR4).
485    pub fn reset_parameters(&self) {
486        self.session_params.lock().clear();
487    }
488
489    // --- Time-travel API ---
490
491    /// Sets a viewing epoch override for time-travel queries.
492    ///
493    /// While set, all queries on this session see the database as it existed
494    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
495    /// to return to normal behavior.
496    pub fn set_viewing_epoch(&self, epoch: EpochId) {
497        *self.viewing_epoch_override.lock() = Some(epoch);
498    }
499
500    /// Clears the viewing epoch override, returning to normal behavior.
501    pub fn clear_viewing_epoch(&self) {
502        *self.viewing_epoch_override.lock() = None;
503    }
504
505    /// Returns the current viewing epoch override, if any.
506    #[must_use]
507    pub fn viewing_epoch(&self) -> Option<EpochId> {
508        *self.viewing_epoch_override.lock()
509    }
510
511    /// Returns all versions of a node with their creation/deletion epochs.
512    ///
513    /// Properties and labels reflect the current state (not versioned per-epoch).
514    #[must_use]
515    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
516        self.active_lpg_store().get_node_history(id)
517    }
518
519    /// Returns all versions of an edge with their creation/deletion epochs.
520    ///
521    /// Properties reflect the current state (not versioned per-epoch).
522    #[must_use]
523    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
524        self.active_lpg_store().get_edge_history(id)
525    }
526
527    /// Checks that the session's graph model supports LPG operations.
528    fn require_lpg(&self, language: &str) -> Result<()> {
529        if self.graph_model == GraphModel::Rdf {
530            return Err(grafeo_common::utils::error::Error::Internal(format!(
531                "This is an RDF database. {language} queries require an LPG database."
532            )));
533        }
534        Ok(())
535    }
536
537    /// Executes a session or transaction command, returning an empty result.
538    #[cfg(feature = "gql")]
539    fn execute_session_command(
540        &self,
541        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
542    ) -> Result<QueryResult> {
543        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
544        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
545
546        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
547        if *self.read_only_tx.lock() {
548            match &cmd {
549                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
550                    return Err(Error::Transaction(
551                        grafeo_common::utils::error::TransactionError::ReadOnly,
552                    ));
553                }
554                _ => {} // Session state + transaction control allowed
555            }
556        }
557
558        match cmd {
559            SessionCommand::CreateGraph {
560                name,
561                if_not_exists,
562                typed,
563                like_graph,
564                copy_of,
565                open: _,
566            } => {
567                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
568                let storage_key = self.effective_graph_key(&name);
569
570                // Validate source graph exists for LIKE / AS COPY OF
571                if let Some(ref src) = like_graph {
572                    let src_key = self.effective_graph_key(src);
573                    if self.store.graph(&src_key).is_none() {
574                        return Err(Error::Query(QueryError::new(
575                            QueryErrorKind::Semantic,
576                            format!("Source graph '{src}' does not exist"),
577                        )));
578                    }
579                }
580                if let Some(ref src) = copy_of {
581                    let src_key = self.effective_graph_key(src);
582                    if self.store.graph(&src_key).is_none() {
583                        return Err(Error::Query(QueryError::new(
584                            QueryErrorKind::Semantic,
585                            format!("Source graph '{src}' does not exist"),
586                        )));
587                    }
588                }
589
590                let created = self
591                    .store
592                    .create_graph(&storage_key)
593                    .map_err(|e| Error::Internal(e.to_string()))?;
594                if !created && !if_not_exists {
595                    return Err(Error::Query(QueryError::new(
596                        QueryErrorKind::Semantic,
597                        format!("Graph '{name}' already exists"),
598                    )));
599                }
600                if created {
601                    #[cfg(feature = "wal")]
602                    self.log_schema_wal(
603                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
604                            name: storage_key.clone(),
605                        },
606                    );
607                }
608
609                // AS COPY OF: copy data from source graph
610                if let Some(ref src) = copy_of {
611                    let src_key = self.effective_graph_key(src);
612                    self.store
613                        .copy_graph(Some(&src_key), Some(&storage_key))
614                        .map_err(|e| Error::Internal(e.to_string()))?;
615                }
616
617                // Bind to graph type if specified
618                if let Some(type_name) = typed
619                    && let Err(e) = self
620                        .catalog
621                        .bind_graph_type(&storage_key, type_name.clone())
622                {
623                    return Err(Error::Query(QueryError::new(
624                        QueryErrorKind::Semantic,
625                        e.to_string(),
626                    )));
627                }
628
629                // LIKE: copy graph type binding from source
630                if let Some(ref src) = like_graph {
631                    let src_key = self.effective_graph_key(src);
632                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
633                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
634                    }
635                }
636
637                Ok(QueryResult::empty())
638            }
639            SessionCommand::DropGraph { name, if_exists } => {
640                let storage_key = self.effective_graph_key(&name);
641                let dropped = self.store.drop_graph(&storage_key);
642                if !dropped && !if_exists {
643                    return Err(Error::Query(QueryError::new(
644                        QueryErrorKind::Semantic,
645                        format!("Graph '{name}' does not exist"),
646                    )));
647                }
648                if dropped {
649                    #[cfg(feature = "wal")]
650                    self.log_schema_wal(
651                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
652                            name: storage_key.clone(),
653                        },
654                    );
655                    // If this session was using the dropped graph, reset to default
656                    let mut current = self.current_graph.lock();
657                    if current
658                        .as_deref()
659                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
660                    {
661                        *current = None;
662                    }
663                }
664                Ok(QueryResult::empty())
665            }
666            SessionCommand::UseGraph(name) => {
667                // Verify graph exists (resolve within current schema)
668                let effective_key = self.effective_graph_key(&name);
669                if !name.eq_ignore_ascii_case("default")
670                    && self.store.graph(&effective_key).is_none()
671                {
672                    return Err(Error::Query(QueryError::new(
673                        QueryErrorKind::Semantic,
674                        format!("Graph '{name}' does not exist"),
675                    )));
676                }
677                self.use_graph(&name);
678                // Track the new graph if in a transaction
679                self.track_graph_touch();
680                Ok(QueryResult::empty())
681            }
682            SessionCommand::SessionSetGraph(name) => {
683                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
684                let effective_key = self.effective_graph_key(&name);
685                if !name.eq_ignore_ascii_case("default")
686                    && self.store.graph(&effective_key).is_none()
687                {
688                    return Err(Error::Query(QueryError::new(
689                        QueryErrorKind::Semantic,
690                        format!("Graph '{name}' does not exist"),
691                    )));
692                }
693                self.use_graph(&name);
694                // Track the new graph if in a transaction
695                self.track_graph_touch();
696                Ok(QueryResult::empty())
697            }
698            SessionCommand::SessionSetSchema(name) => {
699                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
700                if !self.catalog.schema_exists(&name) {
701                    return Err(Error::Query(QueryError::new(
702                        QueryErrorKind::Semantic,
703                        format!("Schema '{name}' does not exist"),
704                    )));
705                }
706                self.set_schema(&name);
707                Ok(QueryResult::empty())
708            }
709            SessionCommand::SessionSetTimeZone(tz) => {
710                self.set_time_zone(&tz);
711                Ok(QueryResult::empty())
712            }
713            SessionCommand::SessionSetParameter(key, expr) => {
714                if key.eq_ignore_ascii_case("viewing_epoch") {
715                    match Self::eval_integer_literal(&expr) {
716                        Some(n) if n >= 0 => {
717                            self.set_viewing_epoch(EpochId::new(n as u64));
718                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
719                        }
720                        _ => Err(Error::Query(QueryError::new(
721                            QueryErrorKind::Semantic,
722                            "viewing_epoch must be a non-negative integer literal",
723                        ))),
724                    }
725                } else {
726                    // For now, store parameter name with Null value.
727                    // Full expression evaluation would require building and executing a plan.
728                    self.set_parameter(&key, Value::Null);
729                    Ok(QueryResult::empty())
730                }
731            }
732            SessionCommand::SessionReset(target) => {
733                use grafeo_adapters::query::gql::ast::SessionResetTarget;
734                match target {
735                    SessionResetTarget::All => self.reset_session(),
736                    SessionResetTarget::Schema => self.reset_schema(),
737                    SessionResetTarget::Graph => self.reset_graph(),
738                    SessionResetTarget::TimeZone => self.reset_time_zone(),
739                    SessionResetTarget::Parameters => self.reset_parameters(),
740                }
741                Ok(QueryResult::empty())
742            }
743            SessionCommand::SessionClose => {
744                self.reset_session();
745                Ok(QueryResult::empty())
746            }
747            SessionCommand::StartTransaction {
748                read_only,
749                isolation_level,
750            } => {
751                let engine_level = isolation_level.map(|l| match l {
752                    TransactionIsolationLevel::ReadCommitted => {
753                        crate::transaction::IsolationLevel::ReadCommitted
754                    }
755                    TransactionIsolationLevel::SnapshotIsolation => {
756                        crate::transaction::IsolationLevel::SnapshotIsolation
757                    }
758                    TransactionIsolationLevel::Serializable => {
759                        crate::transaction::IsolationLevel::Serializable
760                    }
761                });
762                self.begin_transaction_inner(read_only, engine_level)?;
763                Ok(QueryResult::status("Transaction started"))
764            }
765            SessionCommand::Commit => {
766                self.commit_inner()?;
767                Ok(QueryResult::status("Transaction committed"))
768            }
769            SessionCommand::Rollback => {
770                self.rollback_inner()?;
771                Ok(QueryResult::status("Transaction rolled back"))
772            }
773            SessionCommand::Savepoint(name) => {
774                self.savepoint(&name)?;
775                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
776            }
777            SessionCommand::RollbackToSavepoint(name) => {
778                self.rollback_to_savepoint(&name)?;
779                Ok(QueryResult::status(format!(
780                    "Rolled back to savepoint '{name}'"
781                )))
782            }
783            SessionCommand::ReleaseSavepoint(name) => {
784                self.release_savepoint(&name)?;
785                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
786            }
787        }
788    }
789
790    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
791    #[cfg(feature = "wal")]
792    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
793        if let Some(ref wal) = self.wal
794            && let Err(e) = wal.log(record)
795        {
796            tracing::warn!("Failed to log schema change to WAL: {}", e);
797        }
798    }
799
800    /// Executes a schema DDL command, returning a status result.
801    #[cfg(feature = "gql")]
802    fn execute_schema_command(
803        &self,
804        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
805    ) -> Result<QueryResult> {
806        use crate::catalog::{
807            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
808        };
809        use grafeo_adapters::query::gql::ast::SchemaStatement;
810        #[cfg(feature = "wal")]
811        use grafeo_adapters::storage::wal::WalRecord;
812        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
813
814        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
815        macro_rules! wal_log {
816            ($self:expr, $record:expr) => {
817                #[cfg(feature = "wal")]
818                $self.log_schema_wal(&$record);
819            };
820        }
821
822        let result = match cmd {
823            SchemaStatement::CreateNodeType(stmt) => {
824                #[cfg(feature = "wal")]
825                let props_for_wal: Vec<(String, String, bool)> = stmt
826                    .properties
827                    .iter()
828                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
829                    .collect();
830                let def = NodeTypeDefinition {
831                    name: stmt.name.clone(),
832                    properties: stmt
833                        .properties
834                        .iter()
835                        .map(|p| TypedProperty {
836                            name: p.name.clone(),
837                            data_type: PropertyDataType::from_type_name(&p.data_type),
838                            nullable: p.nullable,
839                            default_value: p
840                                .default_value
841                                .as_ref()
842                                .map(|s| parse_default_literal(s)),
843                        })
844                        .collect(),
845                    constraints: Vec::new(),
846                    parent_types: stmt.parent_types.clone(),
847                };
848                let result = if stmt.or_replace {
849                    let _ = self.catalog.drop_node_type(&stmt.name);
850                    self.catalog.register_node_type(def)
851                } else {
852                    self.catalog.register_node_type(def)
853                };
854                match result {
855                    Ok(()) => {
856                        wal_log!(
857                            self,
858                            WalRecord::CreateNodeType {
859                                name: stmt.name.clone(),
860                                properties: props_for_wal,
861                                constraints: Vec::new(),
862                            }
863                        );
864                        Ok(QueryResult::status(format!(
865                            "Created node type '{}'",
866                            stmt.name
867                        )))
868                    }
869                    Err(e) if stmt.if_not_exists => {
870                        let _ = e;
871                        Ok(QueryResult::status("No change"))
872                    }
873                    Err(e) => Err(Error::Query(QueryError::new(
874                        QueryErrorKind::Semantic,
875                        e.to_string(),
876                    ))),
877                }
878            }
879            SchemaStatement::CreateEdgeType(stmt) => {
880                #[cfg(feature = "wal")]
881                let props_for_wal: Vec<(String, String, bool)> = stmt
882                    .properties
883                    .iter()
884                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
885                    .collect();
886                let def = EdgeTypeDefinition {
887                    name: stmt.name.clone(),
888                    properties: stmt
889                        .properties
890                        .iter()
891                        .map(|p| TypedProperty {
892                            name: p.name.clone(),
893                            data_type: PropertyDataType::from_type_name(&p.data_type),
894                            nullable: p.nullable,
895                            default_value: p
896                                .default_value
897                                .as_ref()
898                                .map(|s| parse_default_literal(s)),
899                        })
900                        .collect(),
901                    constraints: Vec::new(),
902                    source_node_types: stmt.source_node_types.clone(),
903                    target_node_types: stmt.target_node_types.clone(),
904                };
905                let result = if stmt.or_replace {
906                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
907                    self.catalog.register_edge_type_def(def)
908                } else {
909                    self.catalog.register_edge_type_def(def)
910                };
911                match result {
912                    Ok(()) => {
913                        wal_log!(
914                            self,
915                            WalRecord::CreateEdgeType {
916                                name: stmt.name.clone(),
917                                properties: props_for_wal,
918                                constraints: Vec::new(),
919                            }
920                        );
921                        Ok(QueryResult::status(format!(
922                            "Created edge type '{}'",
923                            stmt.name
924                        )))
925                    }
926                    Err(e) if stmt.if_not_exists => {
927                        let _ = e;
928                        Ok(QueryResult::status("No change"))
929                    }
930                    Err(e) => Err(Error::Query(QueryError::new(
931                        QueryErrorKind::Semantic,
932                        e.to_string(),
933                    ))),
934                }
935            }
936            SchemaStatement::CreateVectorIndex(stmt) => {
937                Self::create_vector_index_on_store(
938                    &self.active_lpg_store(),
939                    &stmt.node_label,
940                    &stmt.property,
941                    stmt.dimensions,
942                    stmt.metric.as_deref(),
943                )?;
944                wal_log!(
945                    self,
946                    WalRecord::CreateIndex {
947                        name: stmt.name.clone(),
948                        label: stmt.node_label.clone(),
949                        property: stmt.property.clone(),
950                        index_type: "vector".to_string(),
951                    }
952                );
953                Ok(QueryResult::status(format!(
954                    "Created vector index '{}'",
955                    stmt.name
956                )))
957            }
958            SchemaStatement::DropNodeType { name, if_exists } => {
959                match self.catalog.drop_node_type(&name) {
960                    Ok(()) => {
961                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
962                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
963                    }
964                    Err(e) if if_exists => {
965                        let _ = e;
966                        Ok(QueryResult::status("No change"))
967                    }
968                    Err(e) => Err(Error::Query(QueryError::new(
969                        QueryErrorKind::Semantic,
970                        e.to_string(),
971                    ))),
972                }
973            }
974            SchemaStatement::DropEdgeType { name, if_exists } => {
975                match self.catalog.drop_edge_type_def(&name) {
976                    Ok(()) => {
977                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
978                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
979                    }
980                    Err(e) if if_exists => {
981                        let _ = e;
982                        Ok(QueryResult::status("No change"))
983                    }
984                    Err(e) => Err(Error::Query(QueryError::new(
985                        QueryErrorKind::Semantic,
986                        e.to_string(),
987                    ))),
988                }
989            }
990            SchemaStatement::CreateIndex(stmt) => {
991                use grafeo_adapters::query::gql::ast::IndexKind;
992                let active = self.active_lpg_store();
993                let index_type_str = match stmt.index_kind {
994                    IndexKind::Property => "property",
995                    IndexKind::BTree => "btree",
996                    IndexKind::Text => "text",
997                    IndexKind::Vector => "vector",
998                };
999                match stmt.index_kind {
1000                    IndexKind::Property | IndexKind::BTree => {
1001                        for prop in &stmt.properties {
1002                            active.create_property_index(prop);
1003                        }
1004                    }
1005                    IndexKind::Text => {
1006                        for prop in &stmt.properties {
1007                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1008                        }
1009                    }
1010                    IndexKind::Vector => {
1011                        for prop in &stmt.properties {
1012                            Self::create_vector_index_on_store(
1013                                &active,
1014                                &stmt.label,
1015                                prop,
1016                                stmt.options.dimensions,
1017                                stmt.options.metric.as_deref(),
1018                            )?;
1019                        }
1020                    }
1021                }
1022                #[cfg(feature = "wal")]
1023                for prop in &stmt.properties {
1024                    wal_log!(
1025                        self,
1026                        WalRecord::CreateIndex {
1027                            name: stmt.name.clone(),
1028                            label: stmt.label.clone(),
1029                            property: prop.clone(),
1030                            index_type: index_type_str.to_string(),
1031                        }
1032                    );
1033                }
1034                Ok(QueryResult::status(format!(
1035                    "Created {} index '{}'",
1036                    index_type_str, stmt.name
1037                )))
1038            }
1039            SchemaStatement::DropIndex { name, if_exists } => {
1040                // Try to drop property index by name
1041                let dropped = self.active_lpg_store().drop_property_index(&name);
1042                if dropped || if_exists {
1043                    if dropped {
1044                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1045                    }
1046                    Ok(QueryResult::status(if dropped {
1047                        format!("Dropped index '{name}'")
1048                    } else {
1049                        "No change".to_string()
1050                    }))
1051                } else {
1052                    Err(Error::Query(QueryError::new(
1053                        QueryErrorKind::Semantic,
1054                        format!("Index '{name}' does not exist"),
1055                    )))
1056                }
1057            }
1058            SchemaStatement::CreateConstraint(stmt) => {
1059                use crate::catalog::TypeConstraint;
1060                use grafeo_adapters::query::gql::ast::ConstraintKind;
1061                let kind_str = match stmt.constraint_kind {
1062                    ConstraintKind::Unique => "unique",
1063                    ConstraintKind::NodeKey => "node_key",
1064                    ConstraintKind::NotNull => "not_null",
1065                    ConstraintKind::Exists => "exists",
1066                };
1067                let constraint_name = stmt
1068                    .name
1069                    .clone()
1070                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1071
1072                // Register constraint in catalog type definitions
1073                match stmt.constraint_kind {
1074                    ConstraintKind::Unique => {
1075                        for prop in &stmt.properties {
1076                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1077                            let prop_id = self.catalog.get_or_create_property_key(prop);
1078                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1079                        }
1080                        let _ = self.catalog.add_constraint_to_type(
1081                            &stmt.label,
1082                            TypeConstraint::Unique(stmt.properties.clone()),
1083                        );
1084                    }
1085                    ConstraintKind::NodeKey => {
1086                        for prop in &stmt.properties {
1087                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1088                            let prop_id = self.catalog.get_or_create_property_key(prop);
1089                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1090                            let _ = self.catalog.add_required_property(label_id, prop_id);
1091                        }
1092                        let _ = self.catalog.add_constraint_to_type(
1093                            &stmt.label,
1094                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1095                        );
1096                    }
1097                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1098                        for prop in &stmt.properties {
1099                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1100                            let prop_id = self.catalog.get_or_create_property_key(prop);
1101                            let _ = self.catalog.add_required_property(label_id, prop_id);
1102                            let _ = self.catalog.add_constraint_to_type(
1103                                &stmt.label,
1104                                TypeConstraint::NotNull(prop.clone()),
1105                            );
1106                        }
1107                    }
1108                }
1109
1110                wal_log!(
1111                    self,
1112                    WalRecord::CreateConstraint {
1113                        name: constraint_name.clone(),
1114                        label: stmt.label.clone(),
1115                        properties: stmt.properties.clone(),
1116                        kind: kind_str.to_string(),
1117                    }
1118                );
1119                Ok(QueryResult::status(format!(
1120                    "Created {kind_str} constraint '{constraint_name}'"
1121                )))
1122            }
1123            SchemaStatement::DropConstraint { name, if_exists } => {
1124                let _ = if_exists;
1125                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1126                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1127            }
1128            SchemaStatement::CreateGraphType(stmt) => {
1129                use crate::catalog::GraphTypeDefinition;
1130                use grafeo_adapters::query::gql::ast::InlineElementType;
1131
1132                // GG04: LIKE clause copies type from existing graph
1133                let (mut node_types, mut edge_types, open) =
1134                    if let Some(ref like_graph) = stmt.like_graph {
1135                        // Infer types from the graph's bound type, or use its existing types
1136                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1137                            if let Some(existing) = self
1138                                .catalog
1139                                .schema()
1140                                .and_then(|s| s.get_graph_type(&type_name))
1141                            {
1142                                (
1143                                    existing.allowed_node_types.clone(),
1144                                    existing.allowed_edge_types.clone(),
1145                                    existing.open,
1146                                )
1147                            } else {
1148                                (Vec::new(), Vec::new(), true)
1149                            }
1150                        } else {
1151                            // GG22: Infer from graph data (labels used in graph)
1152                            let nt = self.catalog.all_node_type_names();
1153                            let et = self.catalog.all_edge_type_names();
1154                            if nt.is_empty() && et.is_empty() {
1155                                (Vec::new(), Vec::new(), true)
1156                            } else {
1157                                (nt, et, false)
1158                            }
1159                        }
1160                    } else {
1161                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1162                    };
1163
1164                // GG03: Register inline element types and add their names
1165                for inline in &stmt.inline_types {
1166                    match inline {
1167                        InlineElementType::Node {
1168                            name,
1169                            properties,
1170                            key_labels,
1171                            ..
1172                        } => {
1173                            let def = NodeTypeDefinition {
1174                                name: name.clone(),
1175                                properties: properties
1176                                    .iter()
1177                                    .map(|p| TypedProperty {
1178                                        name: p.name.clone(),
1179                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1180                                        nullable: p.nullable,
1181                                        default_value: None,
1182                                    })
1183                                    .collect(),
1184                                constraints: Vec::new(),
1185                                parent_types: key_labels.clone(),
1186                            };
1187                            // Register or replace so inline defs override existing
1188                            self.catalog.register_or_replace_node_type(def);
1189                            #[cfg(feature = "wal")]
1190                            {
1191                                let props_for_wal: Vec<(String, String, bool)> = properties
1192                                    .iter()
1193                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1194                                    .collect();
1195                                self.log_schema_wal(&WalRecord::CreateNodeType {
1196                                    name: name.clone(),
1197                                    properties: props_for_wal,
1198                                    constraints: Vec::new(),
1199                                });
1200                            }
1201                            if !node_types.contains(name) {
1202                                node_types.push(name.clone());
1203                            }
1204                        }
1205                        InlineElementType::Edge {
1206                            name,
1207                            properties,
1208                            source_node_types,
1209                            target_node_types,
1210                            ..
1211                        } => {
1212                            let def = EdgeTypeDefinition {
1213                                name: name.clone(),
1214                                properties: properties
1215                                    .iter()
1216                                    .map(|p| TypedProperty {
1217                                        name: p.name.clone(),
1218                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1219                                        nullable: p.nullable,
1220                                        default_value: None,
1221                                    })
1222                                    .collect(),
1223                                constraints: Vec::new(),
1224                                source_node_types: source_node_types.clone(),
1225                                target_node_types: target_node_types.clone(),
1226                            };
1227                            self.catalog.register_or_replace_edge_type_def(def);
1228                            #[cfg(feature = "wal")]
1229                            {
1230                                let props_for_wal: Vec<(String, String, bool)> = properties
1231                                    .iter()
1232                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1233                                    .collect();
1234                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1235                                    name: name.clone(),
1236                                    properties: props_for_wal,
1237                                    constraints: Vec::new(),
1238                                });
1239                            }
1240                            if !edge_types.contains(name) {
1241                                edge_types.push(name.clone());
1242                            }
1243                        }
1244                    }
1245                }
1246
1247                let def = GraphTypeDefinition {
1248                    name: stmt.name.clone(),
1249                    allowed_node_types: node_types.clone(),
1250                    allowed_edge_types: edge_types.clone(),
1251                    open,
1252                };
1253                let result = if stmt.or_replace {
1254                    // Drop existing first, ignore error if not found
1255                    let _ = self.catalog.drop_graph_type(&stmt.name);
1256                    self.catalog.register_graph_type(def)
1257                } else {
1258                    self.catalog.register_graph_type(def)
1259                };
1260                match result {
1261                    Ok(()) => {
1262                        wal_log!(
1263                            self,
1264                            WalRecord::CreateGraphType {
1265                                name: stmt.name.clone(),
1266                                node_types,
1267                                edge_types,
1268                                open,
1269                            }
1270                        );
1271                        Ok(QueryResult::status(format!(
1272                            "Created graph type '{}'",
1273                            stmt.name
1274                        )))
1275                    }
1276                    Err(e) if stmt.if_not_exists => {
1277                        let _ = e;
1278                        Ok(QueryResult::status("No change"))
1279                    }
1280                    Err(e) => Err(Error::Query(QueryError::new(
1281                        QueryErrorKind::Semantic,
1282                        e.to_string(),
1283                    ))),
1284                }
1285            }
1286            SchemaStatement::DropGraphType { name, if_exists } => {
1287                match self.catalog.drop_graph_type(&name) {
1288                    Ok(()) => {
1289                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1290                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1291                    }
1292                    Err(e) if if_exists => {
1293                        let _ = e;
1294                        Ok(QueryResult::status("No change"))
1295                    }
1296                    Err(e) => Err(Error::Query(QueryError::new(
1297                        QueryErrorKind::Semantic,
1298                        e.to_string(),
1299                    ))),
1300                }
1301            }
1302            SchemaStatement::CreateSchema {
1303                name,
1304                if_not_exists,
1305            } => match self.catalog.register_schema_namespace(name.clone()) {
1306                Ok(()) => {
1307                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1308                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1309                }
1310                Err(e) if if_not_exists => {
1311                    let _ = e;
1312                    Ok(QueryResult::status("No change"))
1313                }
1314                Err(e) => Err(Error::Query(QueryError::new(
1315                    QueryErrorKind::Semantic,
1316                    e.to_string(),
1317                ))),
1318            },
1319            SchemaStatement::DropSchema { name, if_exists } => {
1320                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1321                let prefix = format!("{name}/");
1322                let has_graphs = self
1323                    .store
1324                    .graph_names()
1325                    .iter()
1326                    .any(|g| g.starts_with(&prefix));
1327                if has_graphs {
1328                    return Err(Error::Query(QueryError::new(
1329                        QueryErrorKind::Semantic,
1330                        format!(
1331                            "Schema '{name}' is not empty: drop all graphs in the schema first"
1332                        ),
1333                    )));
1334                }
1335                match self.catalog.drop_schema_namespace(&name) {
1336                    Ok(()) => {
1337                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1338                        // If this session was using the dropped schema, reset it
1339                        let mut current = self.current_schema.lock();
1340                        if current
1341                            .as_deref()
1342                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1343                        {
1344                            *current = None;
1345                        }
1346                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1347                    }
1348                    Err(e) if if_exists => {
1349                        let _ = e;
1350                        Ok(QueryResult::status("No change"))
1351                    }
1352                    Err(e) => Err(Error::Query(QueryError::new(
1353                        QueryErrorKind::Semantic,
1354                        e.to_string(),
1355                    ))),
1356                }
1357            }
1358            SchemaStatement::AlterNodeType(stmt) => {
1359                use grafeo_adapters::query::gql::ast::TypeAlteration;
1360                let mut wal_alts = Vec::new();
1361                for alt in &stmt.alterations {
1362                    match alt {
1363                        TypeAlteration::AddProperty(prop) => {
1364                            let typed = TypedProperty {
1365                                name: prop.name.clone(),
1366                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1367                                nullable: prop.nullable,
1368                                default_value: prop
1369                                    .default_value
1370                                    .as_ref()
1371                                    .map(|s| parse_default_literal(s)),
1372                            };
1373                            self.catalog
1374                                .alter_node_type_add_property(&stmt.name, typed)
1375                                .map_err(|e| {
1376                                    Error::Query(QueryError::new(
1377                                        QueryErrorKind::Semantic,
1378                                        e.to_string(),
1379                                    ))
1380                                })?;
1381                            wal_alts.push((
1382                                "add".to_string(),
1383                                prop.name.clone(),
1384                                prop.data_type.clone(),
1385                                prop.nullable,
1386                            ));
1387                        }
1388                        TypeAlteration::DropProperty(name) => {
1389                            self.catalog
1390                                .alter_node_type_drop_property(&stmt.name, name)
1391                                .map_err(|e| {
1392                                    Error::Query(QueryError::new(
1393                                        QueryErrorKind::Semantic,
1394                                        e.to_string(),
1395                                    ))
1396                                })?;
1397                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1398                        }
1399                    }
1400                }
1401                wal_log!(
1402                    self,
1403                    WalRecord::AlterNodeType {
1404                        name: stmt.name.clone(),
1405                        alterations: wal_alts,
1406                    }
1407                );
1408                Ok(QueryResult::status(format!(
1409                    "Altered node type '{}'",
1410                    stmt.name
1411                )))
1412            }
1413            SchemaStatement::AlterEdgeType(stmt) => {
1414                use grafeo_adapters::query::gql::ast::TypeAlteration;
1415                let mut wal_alts = Vec::new();
1416                for alt in &stmt.alterations {
1417                    match alt {
1418                        TypeAlteration::AddProperty(prop) => {
1419                            let typed = TypedProperty {
1420                                name: prop.name.clone(),
1421                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1422                                nullable: prop.nullable,
1423                                default_value: prop
1424                                    .default_value
1425                                    .as_ref()
1426                                    .map(|s| parse_default_literal(s)),
1427                            };
1428                            self.catalog
1429                                .alter_edge_type_add_property(&stmt.name, typed)
1430                                .map_err(|e| {
1431                                    Error::Query(QueryError::new(
1432                                        QueryErrorKind::Semantic,
1433                                        e.to_string(),
1434                                    ))
1435                                })?;
1436                            wal_alts.push((
1437                                "add".to_string(),
1438                                prop.name.clone(),
1439                                prop.data_type.clone(),
1440                                prop.nullable,
1441                            ));
1442                        }
1443                        TypeAlteration::DropProperty(name) => {
1444                            self.catalog
1445                                .alter_edge_type_drop_property(&stmt.name, name)
1446                                .map_err(|e| {
1447                                    Error::Query(QueryError::new(
1448                                        QueryErrorKind::Semantic,
1449                                        e.to_string(),
1450                                    ))
1451                                })?;
1452                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1453                        }
1454                    }
1455                }
1456                wal_log!(
1457                    self,
1458                    WalRecord::AlterEdgeType {
1459                        name: stmt.name.clone(),
1460                        alterations: wal_alts,
1461                    }
1462                );
1463                Ok(QueryResult::status(format!(
1464                    "Altered edge type '{}'",
1465                    stmt.name
1466                )))
1467            }
1468            SchemaStatement::AlterGraphType(stmt) => {
1469                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1470                let mut wal_alts = Vec::new();
1471                for alt in &stmt.alterations {
1472                    match alt {
1473                        GraphTypeAlteration::AddNodeType(name) => {
1474                            self.catalog
1475                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1476                                .map_err(|e| {
1477                                    Error::Query(QueryError::new(
1478                                        QueryErrorKind::Semantic,
1479                                        e.to_string(),
1480                                    ))
1481                                })?;
1482                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1483                        }
1484                        GraphTypeAlteration::DropNodeType(name) => {
1485                            self.catalog
1486                                .alter_graph_type_drop_node_type(&stmt.name, name)
1487                                .map_err(|e| {
1488                                    Error::Query(QueryError::new(
1489                                        QueryErrorKind::Semantic,
1490                                        e.to_string(),
1491                                    ))
1492                                })?;
1493                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1494                        }
1495                        GraphTypeAlteration::AddEdgeType(name) => {
1496                            self.catalog
1497                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1498                                .map_err(|e| {
1499                                    Error::Query(QueryError::new(
1500                                        QueryErrorKind::Semantic,
1501                                        e.to_string(),
1502                                    ))
1503                                })?;
1504                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1505                        }
1506                        GraphTypeAlteration::DropEdgeType(name) => {
1507                            self.catalog
1508                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1509                                .map_err(|e| {
1510                                    Error::Query(QueryError::new(
1511                                        QueryErrorKind::Semantic,
1512                                        e.to_string(),
1513                                    ))
1514                                })?;
1515                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1516                        }
1517                    }
1518                }
1519                wal_log!(
1520                    self,
1521                    WalRecord::AlterGraphType {
1522                        name: stmt.name.clone(),
1523                        alterations: wal_alts,
1524                    }
1525                );
1526                Ok(QueryResult::status(format!(
1527                    "Altered graph type '{}'",
1528                    stmt.name
1529                )))
1530            }
1531            SchemaStatement::CreateProcedure(stmt) => {
1532                use crate::catalog::ProcedureDefinition;
1533
1534                let def = ProcedureDefinition {
1535                    name: stmt.name.clone(),
1536                    params: stmt
1537                        .params
1538                        .iter()
1539                        .map(|p| (p.name.clone(), p.param_type.clone()))
1540                        .collect(),
1541                    returns: stmt
1542                        .returns
1543                        .iter()
1544                        .map(|r| (r.name.clone(), r.return_type.clone()))
1545                        .collect(),
1546                    body: stmt.body.clone(),
1547                };
1548
1549                if stmt.or_replace {
1550                    self.catalog.replace_procedure(def).map_err(|e| {
1551                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1552                    })?;
1553                } else {
1554                    match self.catalog.register_procedure(def) {
1555                        Ok(()) => {}
1556                        Err(_) if stmt.if_not_exists => {
1557                            return Ok(QueryResult::empty());
1558                        }
1559                        Err(e) => {
1560                            return Err(Error::Query(QueryError::new(
1561                                QueryErrorKind::Semantic,
1562                                e.to_string(),
1563                            )));
1564                        }
1565                    }
1566                }
1567
1568                wal_log!(
1569                    self,
1570                    WalRecord::CreateProcedure {
1571                        name: stmt.name.clone(),
1572                        params: stmt
1573                            .params
1574                            .iter()
1575                            .map(|p| (p.name.clone(), p.param_type.clone()))
1576                            .collect(),
1577                        returns: stmt
1578                            .returns
1579                            .iter()
1580                            .map(|r| (r.name.clone(), r.return_type.clone()))
1581                            .collect(),
1582                        body: stmt.body,
1583                    }
1584                );
1585                Ok(QueryResult::status(format!(
1586                    "Created procedure '{}'",
1587                    stmt.name
1588                )))
1589            }
1590            SchemaStatement::DropProcedure { name, if_exists } => {
1591                match self.catalog.drop_procedure(&name) {
1592                    Ok(()) => {}
1593                    Err(_) if if_exists => {
1594                        return Ok(QueryResult::empty());
1595                    }
1596                    Err(e) => {
1597                        return Err(Error::Query(QueryError::new(
1598                            QueryErrorKind::Semantic,
1599                            e.to_string(),
1600                        )));
1601                    }
1602                }
1603                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1604                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1605            }
1606            SchemaStatement::ShowIndexes => {
1607                return self.execute_show_indexes();
1608            }
1609            SchemaStatement::ShowConstraints => {
1610                return self.execute_show_constraints();
1611            }
1612            SchemaStatement::ShowNodeTypes => {
1613                return self.execute_show_node_types();
1614            }
1615            SchemaStatement::ShowEdgeTypes => {
1616                return self.execute_show_edge_types();
1617            }
1618            SchemaStatement::ShowGraphTypes => {
1619                return self.execute_show_graph_types();
1620            }
1621            SchemaStatement::ShowGraphType(name) => {
1622                return self.execute_show_graph_type(&name);
1623            }
1624            SchemaStatement::ShowCurrentGraphType => {
1625                return self.execute_show_current_graph_type();
1626            }
1627            SchemaStatement::ShowGraphs => {
1628                return self.execute_show_graphs();
1629            }
1630            SchemaStatement::ShowSchemas => {
1631                return self.execute_show_schemas();
1632            }
1633        };
1634
1635        // Invalidate all cached query plans after any successful DDL change.
1636        // DDL is rare, so clearing the entire cache is cheap and correct.
1637        if result.is_ok() {
1638            self.query_cache.clear();
1639        }
1640
1641        result
1642    }
1643
1644    /// Creates a vector index on the store by scanning existing nodes.
1645    #[cfg(all(feature = "gql", feature = "vector-index"))]
1646    fn create_vector_index_on_store(
1647        store: &LpgStore,
1648        label: &str,
1649        property: &str,
1650        dimensions: Option<usize>,
1651        metric: Option<&str>,
1652    ) -> Result<()> {
1653        use grafeo_common::types::{PropertyKey, Value};
1654        use grafeo_common::utils::error::Error;
1655        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1656
1657        let metric = match metric {
1658            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1659                Error::Internal(format!(
1660                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1661                ))
1662            })?,
1663            None => DistanceMetric::Cosine,
1664        };
1665
1666        let prop_key = PropertyKey::new(property);
1667        let mut found_dims: Option<usize> = dimensions;
1668        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1669
1670        for node in store.nodes_with_label(label) {
1671            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1672                if let Some(expected) = found_dims {
1673                    if v.len() != expected {
1674                        return Err(Error::Internal(format!(
1675                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1676                            v.len(),
1677                            node.id.0
1678                        )));
1679                    }
1680                } else {
1681                    found_dims = Some(v.len());
1682                }
1683                vectors.push((node.id, v.to_vec()));
1684            }
1685        }
1686
1687        let Some(dims) = found_dims else {
1688            return Err(Error::Internal(format!(
1689                "No vector properties found on :{label}({property}) and no dimensions specified"
1690            )));
1691        };
1692
1693        let config = HnswConfig::new(dims, metric);
1694        let index = HnswIndex::with_capacity(config, vectors.len());
1695        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1696        for (node_id, vec) in &vectors {
1697            index.insert(*node_id, vec, &accessor);
1698        }
1699
1700        store.add_vector_index(label, property, Arc::new(index));
1701        Ok(())
1702    }
1703
1704    /// Stub for when vector-index feature is not enabled.
1705    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1706    fn create_vector_index_on_store(
1707        _store: &LpgStore,
1708        _label: &str,
1709        _property: &str,
1710        _dimensions: Option<usize>,
1711        _metric: Option<&str>,
1712    ) -> Result<()> {
1713        Err(grafeo_common::utils::error::Error::Internal(
1714            "Vector index support requires the 'vector-index' feature".to_string(),
1715        ))
1716    }
1717
1718    /// Creates a text index on the store by scanning existing nodes.
1719    #[cfg(all(feature = "gql", feature = "text-index"))]
1720    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1721        use grafeo_common::types::{PropertyKey, Value};
1722        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1723
1724        let mut index = InvertedIndex::new(BM25Config::default());
1725        let prop_key = PropertyKey::new(property);
1726
1727        let nodes = store.nodes_by_label(label);
1728        for node_id in nodes {
1729            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1730                index.insert(node_id, text.as_str());
1731            }
1732        }
1733
1734        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1735        Ok(())
1736    }
1737
1738    /// Stub for when text-index feature is not enabled.
1739    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1740    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1741        Err(grafeo_common::utils::error::Error::Internal(
1742            "Text index support requires the 'text-index' feature".to_string(),
1743        ))
1744    }
1745
1746    /// Returns a table of all indexes from the catalog.
1747    fn execute_show_indexes(&self) -> Result<QueryResult> {
1748        let indexes = self.catalog.all_indexes();
1749        let columns = vec![
1750            "name".to_string(),
1751            "type".to_string(),
1752            "label".to_string(),
1753            "property".to_string(),
1754        ];
1755        let rows: Vec<Vec<Value>> = indexes
1756            .into_iter()
1757            .map(|def| {
1758                let label_name = self
1759                    .catalog
1760                    .get_label_name(def.label)
1761                    .unwrap_or_else(|| "?".into());
1762                let prop_name = self
1763                    .catalog
1764                    .get_property_key_name(def.property_key)
1765                    .unwrap_or_else(|| "?".into());
1766                vec![
1767                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1768                    Value::from(format!("{:?}", def.index_type)),
1769                    Value::from(&*label_name),
1770                    Value::from(&*prop_name),
1771                ]
1772            })
1773            .collect();
1774        Ok(QueryResult {
1775            columns,
1776            column_types: Vec::new(),
1777            rows,
1778            ..QueryResult::empty()
1779        })
1780    }
1781
1782    /// Returns a table of all constraints (currently metadata-only).
1783    fn execute_show_constraints(&self) -> Result<QueryResult> {
1784        // Constraints are tracked in WAL but not yet in a queryable catalog.
1785        // Return an empty table with the expected schema.
1786        Ok(QueryResult {
1787            columns: vec![
1788                "name".to_string(),
1789                "type".to_string(),
1790                "label".to_string(),
1791                "properties".to_string(),
1792            ],
1793            column_types: Vec::new(),
1794            rows: Vec::new(),
1795            ..QueryResult::empty()
1796        })
1797    }
1798
1799    /// Returns a table of all registered node types.
1800    fn execute_show_node_types(&self) -> Result<QueryResult> {
1801        let columns = vec![
1802            "name".to_string(),
1803            "properties".to_string(),
1804            "constraints".to_string(),
1805            "parents".to_string(),
1806        ];
1807        let type_names = self.catalog.all_node_type_names();
1808        let rows: Vec<Vec<Value>> = type_names
1809            .into_iter()
1810            .filter_map(|name| {
1811                let def = self.catalog.get_node_type(&name)?;
1812                let props: Vec<String> = def
1813                    .properties
1814                    .iter()
1815                    .map(|p| {
1816                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1817                        format!("{} {}{}", p.name, p.data_type, nullable)
1818                    })
1819                    .collect();
1820                let constraints: Vec<String> =
1821                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1822                let parents = def.parent_types.join(", ");
1823                Some(vec![
1824                    Value::from(name),
1825                    Value::from(props.join(", ")),
1826                    Value::from(constraints.join(", ")),
1827                    Value::from(parents),
1828                ])
1829            })
1830            .collect();
1831        Ok(QueryResult {
1832            columns,
1833            column_types: Vec::new(),
1834            rows,
1835            ..QueryResult::empty()
1836        })
1837    }
1838
1839    /// Returns a table of all registered edge types.
1840    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1841        let columns = vec![
1842            "name".to_string(),
1843            "properties".to_string(),
1844            "source_types".to_string(),
1845            "target_types".to_string(),
1846        ];
1847        let type_names = self.catalog.all_edge_type_names();
1848        let rows: Vec<Vec<Value>> = type_names
1849            .into_iter()
1850            .filter_map(|name| {
1851                let def = self.catalog.get_edge_type_def(&name)?;
1852                let props: Vec<String> = def
1853                    .properties
1854                    .iter()
1855                    .map(|p| {
1856                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1857                        format!("{} {}{}", p.name, p.data_type, nullable)
1858                    })
1859                    .collect();
1860                let src = def.source_node_types.join(", ");
1861                let tgt = def.target_node_types.join(", ");
1862                Some(vec![
1863                    Value::from(name),
1864                    Value::from(props.join(", ")),
1865                    Value::from(src),
1866                    Value::from(tgt),
1867                ])
1868            })
1869            .collect();
1870        Ok(QueryResult {
1871            columns,
1872            column_types: Vec::new(),
1873            rows,
1874            ..QueryResult::empty()
1875        })
1876    }
1877
1878    /// Returns a table of all registered graph types.
1879    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1880        let columns = vec![
1881            "name".to_string(),
1882            "open".to_string(),
1883            "node_types".to_string(),
1884            "edge_types".to_string(),
1885        ];
1886        let type_names = self.catalog.all_graph_type_names();
1887        let rows: Vec<Vec<Value>> = type_names
1888            .into_iter()
1889            .filter_map(|name| {
1890                let def = self.catalog.get_graph_type_def(&name)?;
1891                Some(vec![
1892                    Value::from(name),
1893                    Value::from(def.open),
1894                    Value::from(def.allowed_node_types.join(", ")),
1895                    Value::from(def.allowed_edge_types.join(", ")),
1896                ])
1897            })
1898            .collect();
1899        Ok(QueryResult {
1900            columns,
1901            column_types: Vec::new(),
1902            rows,
1903            ..QueryResult::empty()
1904        })
1905    }
1906
1907    /// Returns the list of named graphs visible in the current schema context.
1908    ///
1909    /// When a session schema is set, only graphs belonging to that schema are
1910    /// shown (their compound prefix is stripped). When no schema is set, graphs
1911    /// without a schema prefix are shown (the default schema).
1912    fn execute_show_graphs(&self) -> Result<QueryResult> {
1913        let schema = self.current_schema.lock().clone();
1914        let all_names = self.store.graph_names();
1915
1916        let mut names: Vec<String> = match &schema {
1917            Some(s) => {
1918                let prefix = format!("{s}/");
1919                all_names
1920                    .into_iter()
1921                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1922                    .collect()
1923            }
1924            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1925        };
1926        names.sort();
1927
1928        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1929        Ok(QueryResult {
1930            columns: vec!["name".to_string()],
1931            column_types: Vec::new(),
1932            rows,
1933            ..QueryResult::empty()
1934        })
1935    }
1936
1937    /// Returns the list of all schema namespaces.
1938    fn execute_show_schemas(&self) -> Result<QueryResult> {
1939        let mut names = self.catalog.schema_names();
1940        names.sort();
1941        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1942        Ok(QueryResult {
1943            columns: vec!["name".to_string()],
1944            column_types: Vec::new(),
1945            rows,
1946            ..QueryResult::empty()
1947        })
1948    }
1949
1950    /// Returns detailed info for a specific graph type.
1951    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1952        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1953
1954        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1955            Error::Query(QueryError::new(
1956                QueryErrorKind::Semantic,
1957                format!("Graph type '{name}' not found"),
1958            ))
1959        })?;
1960
1961        let columns = vec![
1962            "name".to_string(),
1963            "open".to_string(),
1964            "node_types".to_string(),
1965            "edge_types".to_string(),
1966        ];
1967        let rows = vec![vec![
1968            Value::from(def.name),
1969            Value::from(def.open),
1970            Value::from(def.allowed_node_types.join(", ")),
1971            Value::from(def.allowed_edge_types.join(", ")),
1972        ]];
1973        Ok(QueryResult {
1974            columns,
1975            column_types: Vec::new(),
1976            rows,
1977            ..QueryResult::empty()
1978        })
1979    }
1980
1981    /// Returns the graph type bound to the current graph.
1982    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
1983        let graph_name = self
1984            .current_graph()
1985            .unwrap_or_else(|| "default".to_string());
1986        let columns = vec![
1987            "graph".to_string(),
1988            "graph_type".to_string(),
1989            "open".to_string(),
1990            "node_types".to_string(),
1991            "edge_types".to_string(),
1992        ];
1993
1994        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
1995            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
1996        {
1997            let rows = vec![vec![
1998                Value::from(graph_name),
1999                Value::from(type_name),
2000                Value::from(def.open),
2001                Value::from(def.allowed_node_types.join(", ")),
2002                Value::from(def.allowed_edge_types.join(", ")),
2003            ]];
2004            return Ok(QueryResult {
2005                columns,
2006                column_types: Vec::new(),
2007                rows,
2008                ..QueryResult::empty()
2009            });
2010        }
2011
2012        // No graph type binding found
2013        Ok(QueryResult {
2014            columns,
2015            column_types: Vec::new(),
2016            rows: vec![vec![
2017                Value::from(graph_name),
2018                Value::Null,
2019                Value::Null,
2020                Value::Null,
2021                Value::Null,
2022            ]],
2023            ..QueryResult::empty()
2024        })
2025    }
2026
2027    /// Executes a GQL query.
2028    ///
2029    /// # Errors
2030    ///
2031    /// Returns an error if the query fails to parse or execute.
2032    ///
2033    /// # Examples
2034    ///
2035    /// ```no_run
2036    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2037    /// use grafeo_engine::GrafeoDB;
2038    ///
2039    /// let db = GrafeoDB::new_in_memory();
2040    /// let session = db.session();
2041    ///
2042    /// // Create a node
2043    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2044    ///
2045    /// // Query nodes
2046    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2047    /// for row in &result.rows {
2048    ///     println!("{:?}", row);
2049    /// }
2050    /// # Ok(())
2051    /// # }
2052    /// ```
2053    #[cfg(feature = "gql")]
2054    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2055        self.require_lpg("GQL")?;
2056
2057        use crate::query::{
2058            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2059            processor::QueryLanguage, translators::gql,
2060        };
2061
2062        #[cfg(not(target_arch = "wasm32"))]
2063        let start_time = std::time::Instant::now();
2064
2065        // Parse and translate, checking for session/schema commands first
2066        let translation = gql::translate_full(query)?;
2067        let logical_plan = match translation {
2068            gql::GqlTranslationResult::SessionCommand(cmd) => {
2069                return self.execute_session_command(cmd);
2070            }
2071            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2072                // All DDL is a write operation
2073                if *self.read_only_tx.lock() {
2074                    return Err(grafeo_common::utils::error::Error::Transaction(
2075                        grafeo_common::utils::error::TransactionError::ReadOnly,
2076                    ));
2077                }
2078                return self.execute_schema_command(cmd);
2079            }
2080            gql::GqlTranslationResult::Plan(plan) => {
2081                // Block mutations in read-only transactions
2082                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2083                    return Err(grafeo_common::utils::error::Error::Transaction(
2084                        grafeo_common::utils::error::TransactionError::ReadOnly,
2085                    ));
2086                }
2087                plan
2088            }
2089        };
2090
2091        // Create cache key for this query
2092        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2093
2094        // Try to get cached optimized plan, or use the plan we just translated
2095        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2096            cached_plan
2097        } else {
2098            // Semantic validation
2099            let mut binder = Binder::new();
2100            let _binding_context = binder.bind(&logical_plan)?;
2101
2102            // Optimize the plan
2103            let active = self.active_store();
2104            let optimizer = Optimizer::from_graph_store(&*active);
2105            let plan = optimizer.optimize(logical_plan)?;
2106
2107            // Cache the optimized plan for future use
2108            self.query_cache.put_optimized(cache_key, plan.clone());
2109
2110            plan
2111        };
2112
2113        // Resolve the active store for query execution
2114        let active = self.active_store();
2115
2116        // EXPLAIN: annotate pushdown hints and return the plan tree
2117        if optimized_plan.explain {
2118            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2119            let mut plan = optimized_plan;
2120            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2121            return Ok(explain_result(&plan));
2122        }
2123
2124        // PROFILE: execute with per-operator instrumentation
2125        if optimized_plan.profile {
2126            let has_mutations = optimized_plan.root.has_mutations();
2127            return self.with_auto_commit(has_mutations, || {
2128                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2129                let planner = self.create_planner_for_store(
2130                    Arc::clone(&active),
2131                    viewing_epoch,
2132                    transaction_id,
2133                );
2134                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2135
2136                let executor = Executor::with_columns(physical_plan.columns.clone())
2137                    .with_deadline(self.query_deadline());
2138                let _result = executor.execute(physical_plan.operator.as_mut())?;
2139
2140                let total_time_ms;
2141                #[cfg(not(target_arch = "wasm32"))]
2142                {
2143                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2144                }
2145                #[cfg(target_arch = "wasm32")]
2146                {
2147                    total_time_ms = 0.0;
2148                }
2149
2150                let profile_tree = crate::query::profile::build_profile_tree(
2151                    &optimized_plan.root,
2152                    &mut entries.into_iter(),
2153                );
2154                Ok(crate::query::profile::profile_result(
2155                    &profile_tree,
2156                    total_time_ms,
2157                ))
2158            });
2159        }
2160
2161        let has_mutations = optimized_plan.root.has_mutations();
2162
2163        let result = self.with_auto_commit(has_mutations, || {
2164            // Get transaction context for MVCC visibility
2165            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2166
2167            // Convert to physical plan with transaction context
2168            // (Physical planning cannot be cached as it depends on transaction state)
2169            let planner =
2170                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2171            let mut physical_plan = planner.plan(&optimized_plan)?;
2172
2173            // Execute the plan
2174            let executor = Executor::with_columns(physical_plan.columns.clone())
2175                .with_deadline(self.query_deadline());
2176            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2177
2178            // Add execution metrics
2179            let rows_scanned = result.rows.len() as u64;
2180            #[cfg(not(target_arch = "wasm32"))]
2181            {
2182                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2183                result.execution_time_ms = Some(elapsed_ms);
2184            }
2185            result.rows_scanned = Some(rows_scanned);
2186
2187            Ok(result)
2188        });
2189
2190        // Record metrics for this query execution.
2191        #[cfg(feature = "metrics")]
2192        {
2193            #[cfg(not(target_arch = "wasm32"))]
2194            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2195            #[cfg(target_arch = "wasm32")]
2196            let elapsed_ms = None;
2197            self.record_query_metrics("gql", elapsed_ms, &result);
2198        }
2199
2200        result
2201    }
2202
2203    /// Executes a GQL query with visibility at the specified epoch.
2204    ///
2205    /// This enables time-travel queries: the query sees the database
2206    /// as it existed at the given epoch.
2207    ///
2208    /// # Errors
2209    ///
2210    /// Returns an error if parsing or execution fails.
2211    #[cfg(feature = "gql")]
2212    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2213        let previous = self.viewing_epoch_override.lock().replace(epoch);
2214        let result = self.execute(query);
2215        *self.viewing_epoch_override.lock() = previous;
2216        result
2217    }
2218
2219    /// Executes a GQL query with parameters.
2220    ///
2221    /// # Errors
2222    ///
2223    /// Returns an error if the query fails to parse or execute.
2224    #[cfg(feature = "gql")]
2225    pub fn execute_with_params(
2226        &self,
2227        query: &str,
2228        params: std::collections::HashMap<String, Value>,
2229    ) -> Result<QueryResult> {
2230        self.require_lpg("GQL")?;
2231
2232        use crate::query::processor::{QueryLanguage, QueryProcessor};
2233
2234        let has_mutations = Self::query_looks_like_mutation(query);
2235        let active = self.active_store();
2236
2237        self.with_auto_commit(has_mutations, || {
2238            // Get transaction context for MVCC visibility
2239            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2240
2241            // Create processor with transaction context
2242            let processor = QueryProcessor::for_graph_store_with_transaction(
2243                Arc::clone(&active),
2244                Arc::clone(&self.transaction_manager),
2245            )?;
2246
2247            // Apply transaction context if in a transaction
2248            let processor = if let Some(transaction_id) = transaction_id {
2249                processor.with_transaction_context(viewing_epoch, transaction_id)
2250            } else {
2251                processor
2252            };
2253
2254            processor.process(query, QueryLanguage::Gql, Some(&params))
2255        })
2256    }
2257
2258    /// Executes a GQL query with parameters.
2259    ///
2260    /// # Errors
2261    ///
2262    /// Returns an error if no query language is enabled.
2263    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2264    pub fn execute_with_params(
2265        &self,
2266        _query: &str,
2267        _params: std::collections::HashMap<String, Value>,
2268    ) -> Result<QueryResult> {
2269        Err(grafeo_common::utils::error::Error::Internal(
2270            "No query language enabled".to_string(),
2271        ))
2272    }
2273
2274    /// Executes a GQL query.
2275    ///
2276    /// # Errors
2277    ///
2278    /// Returns an error if no query language is enabled.
2279    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2280    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2281        Err(grafeo_common::utils::error::Error::Internal(
2282            "No query language enabled".to_string(),
2283        ))
2284    }
2285
2286    /// Executes a Cypher query.
2287    ///
2288    /// # Errors
2289    ///
2290    /// Returns an error if the query fails to parse or execute.
2291    #[cfg(feature = "cypher")]
2292    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2293        use crate::query::{
2294            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2295            processor::QueryLanguage, translators::cypher,
2296        };
2297        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2298
2299        // Handle schema DDL and SHOW commands before the normal query path
2300        let translation = cypher::translate_full(query)?;
2301        match translation {
2302            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2303                if *self.read_only_tx.lock() {
2304                    return Err(GrafeoError::Query(QueryError::new(
2305                        QueryErrorKind::Semantic,
2306                        "Cannot execute schema DDL in a read-only transaction",
2307                    )));
2308                }
2309                return self.execute_schema_command(cmd);
2310            }
2311            cypher::CypherTranslationResult::ShowIndexes => {
2312                return self.execute_show_indexes();
2313            }
2314            cypher::CypherTranslationResult::ShowConstraints => {
2315                return self.execute_show_constraints();
2316            }
2317            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2318                return self.execute_show_current_graph_type();
2319            }
2320            cypher::CypherTranslationResult::Plan(_) => {
2321                // Fall through to normal execution below
2322            }
2323        }
2324
2325        #[cfg(not(target_arch = "wasm32"))]
2326        let start_time = std::time::Instant::now();
2327
2328        // Create cache key for this query
2329        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2330
2331        // Try to get cached optimized plan
2332        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2333            cached_plan
2334        } else {
2335            // Parse and translate the query to a logical plan
2336            let logical_plan = cypher::translate(query)?;
2337
2338            // Semantic validation
2339            let mut binder = Binder::new();
2340            let _binding_context = binder.bind(&logical_plan)?;
2341
2342            // Optimize the plan
2343            let active = self.active_store();
2344            let optimizer = Optimizer::from_graph_store(&*active);
2345            let plan = optimizer.optimize(logical_plan)?;
2346
2347            // Cache the optimized plan
2348            self.query_cache.put_optimized(cache_key, plan.clone());
2349
2350            plan
2351        };
2352
2353        // Resolve the active store for query execution
2354        let active = self.active_store();
2355
2356        // EXPLAIN
2357        if optimized_plan.explain {
2358            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2359            let mut plan = optimized_plan;
2360            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2361            return Ok(explain_result(&plan));
2362        }
2363
2364        // PROFILE
2365        if optimized_plan.profile {
2366            let has_mutations = optimized_plan.root.has_mutations();
2367            return self.with_auto_commit(has_mutations, || {
2368                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2369                let planner = self.create_planner_for_store(
2370                    Arc::clone(&active),
2371                    viewing_epoch,
2372                    transaction_id,
2373                );
2374                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2375
2376                let executor = Executor::with_columns(physical_plan.columns.clone())
2377                    .with_deadline(self.query_deadline());
2378                let _result = executor.execute(physical_plan.operator.as_mut())?;
2379
2380                let total_time_ms;
2381                #[cfg(not(target_arch = "wasm32"))]
2382                {
2383                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2384                }
2385                #[cfg(target_arch = "wasm32")]
2386                {
2387                    total_time_ms = 0.0;
2388                }
2389
2390                let profile_tree = crate::query::profile::build_profile_tree(
2391                    &optimized_plan.root,
2392                    &mut entries.into_iter(),
2393                );
2394                Ok(crate::query::profile::profile_result(
2395                    &profile_tree,
2396                    total_time_ms,
2397                ))
2398            });
2399        }
2400
2401        let has_mutations = optimized_plan.root.has_mutations();
2402
2403        let result = self.with_auto_commit(has_mutations, || {
2404            // Get transaction context for MVCC visibility
2405            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2406
2407            // Convert to physical plan with transaction context
2408            let planner =
2409                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2410            let mut physical_plan = planner.plan(&optimized_plan)?;
2411
2412            // Execute the plan
2413            let executor = Executor::with_columns(physical_plan.columns.clone())
2414                .with_deadline(self.query_deadline());
2415            executor.execute(physical_plan.operator.as_mut())
2416        });
2417
2418        #[cfg(feature = "metrics")]
2419        {
2420            #[cfg(not(target_arch = "wasm32"))]
2421            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2422            #[cfg(target_arch = "wasm32")]
2423            let elapsed_ms = None;
2424            self.record_query_metrics("cypher", elapsed_ms, &result);
2425        }
2426
2427        result
2428    }
2429
2430    /// Executes a Gremlin query.
2431    ///
2432    /// # Errors
2433    ///
2434    /// Returns an error if the query fails to parse or execute.
2435    ///
2436    /// # Examples
2437    ///
2438    /// ```no_run
2439    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2440    /// use grafeo_engine::GrafeoDB;
2441    ///
2442    /// let db = GrafeoDB::new_in_memory();
2443    /// let session = db.session();
2444    ///
2445    /// // Create some nodes first
2446    /// session.create_node(&["Person"]);
2447    ///
2448    /// // Query using Gremlin
2449    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2450    /// # Ok(())
2451    /// # }
2452    /// ```
2453    #[cfg(feature = "gremlin")]
2454    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2455        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2456
2457        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2458        let start_time = Instant::now();
2459
2460        // Parse and translate the query to a logical plan
2461        let logical_plan = gremlin::translate(query)?;
2462
2463        // Semantic validation
2464        let mut binder = Binder::new();
2465        let _binding_context = binder.bind(&logical_plan)?;
2466
2467        // Optimize the plan
2468        let active = self.active_store();
2469        let optimizer = Optimizer::from_graph_store(&*active);
2470        let optimized_plan = optimizer.optimize(logical_plan)?;
2471
2472        let has_mutations = optimized_plan.root.has_mutations();
2473
2474        let result = self.with_auto_commit(has_mutations, || {
2475            // Get transaction context for MVCC visibility
2476            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2477
2478            // Convert to physical plan with transaction context
2479            let planner =
2480                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2481            let mut physical_plan = planner.plan(&optimized_plan)?;
2482
2483            // Execute the plan
2484            let executor = Executor::with_columns(physical_plan.columns.clone())
2485                .with_deadline(self.query_deadline());
2486            executor.execute(physical_plan.operator.as_mut())
2487        });
2488
2489        #[cfg(feature = "metrics")]
2490        {
2491            #[cfg(not(target_arch = "wasm32"))]
2492            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2493            #[cfg(target_arch = "wasm32")]
2494            let elapsed_ms = None;
2495            self.record_query_metrics("gremlin", elapsed_ms, &result);
2496        }
2497
2498        result
2499    }
2500
2501    /// Executes a Gremlin query with parameters.
2502    ///
2503    /// # Errors
2504    ///
2505    /// Returns an error if the query fails to parse or execute.
2506    #[cfg(feature = "gremlin")]
2507    pub fn execute_gremlin_with_params(
2508        &self,
2509        query: &str,
2510        params: std::collections::HashMap<String, Value>,
2511    ) -> Result<QueryResult> {
2512        use crate::query::processor::{QueryLanguage, QueryProcessor};
2513
2514        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2515        let start_time = Instant::now();
2516
2517        let has_mutations = Self::query_looks_like_mutation(query);
2518        let active = self.active_store();
2519
2520        let result = self.with_auto_commit(has_mutations, || {
2521            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2522            let processor = QueryProcessor::for_graph_store_with_transaction(
2523                Arc::clone(&active),
2524                Arc::clone(&self.transaction_manager),
2525            )?;
2526            let processor = if let Some(transaction_id) = transaction_id {
2527                processor.with_transaction_context(viewing_epoch, transaction_id)
2528            } else {
2529                processor
2530            };
2531            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2532        });
2533
2534        #[cfg(feature = "metrics")]
2535        {
2536            #[cfg(not(target_arch = "wasm32"))]
2537            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2538            #[cfg(target_arch = "wasm32")]
2539            let elapsed_ms = None;
2540            self.record_query_metrics("gremlin", elapsed_ms, &result);
2541        }
2542
2543        result
2544    }
2545
2546    /// Executes a GraphQL query against the LPG store.
2547    ///
2548    /// # Errors
2549    ///
2550    /// Returns an error if the query fails to parse or execute.
2551    ///
2552    /// # Examples
2553    ///
2554    /// ```no_run
2555    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2556    /// use grafeo_engine::GrafeoDB;
2557    ///
2558    /// let db = GrafeoDB::new_in_memory();
2559    /// let session = db.session();
2560    ///
2561    /// // Create some nodes first
2562    /// session.create_node(&["User"]);
2563    ///
2564    /// // Query using GraphQL
2565    /// let result = session.execute_graphql("query { user { id name } }")?;
2566    /// # Ok(())
2567    /// # }
2568    /// ```
2569    #[cfg(feature = "graphql")]
2570    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2571        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2572
2573        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2574        let start_time = Instant::now();
2575
2576        let logical_plan = graphql::translate(query)?;
2577        let mut binder = Binder::new();
2578        let _binding_context = binder.bind(&logical_plan)?;
2579
2580        let active = self.active_store();
2581        let optimizer = Optimizer::from_graph_store(&*active);
2582        let optimized_plan = optimizer.optimize(logical_plan)?;
2583        let has_mutations = optimized_plan.root.has_mutations();
2584
2585        let result = self.with_auto_commit(has_mutations, || {
2586            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2587            let planner =
2588                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2589            let mut physical_plan = planner.plan(&optimized_plan)?;
2590            let executor = Executor::with_columns(physical_plan.columns.clone())
2591                .with_deadline(self.query_deadline());
2592            executor.execute(physical_plan.operator.as_mut())
2593        });
2594
2595        #[cfg(feature = "metrics")]
2596        {
2597            #[cfg(not(target_arch = "wasm32"))]
2598            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2599            #[cfg(target_arch = "wasm32")]
2600            let elapsed_ms = None;
2601            self.record_query_metrics("graphql", elapsed_ms, &result);
2602        }
2603
2604        result
2605    }
2606
2607    /// Executes a GraphQL query with parameters.
2608    ///
2609    /// # Errors
2610    ///
2611    /// Returns an error if the query fails to parse or execute.
2612    #[cfg(feature = "graphql")]
2613    pub fn execute_graphql_with_params(
2614        &self,
2615        query: &str,
2616        params: std::collections::HashMap<String, Value>,
2617    ) -> Result<QueryResult> {
2618        use crate::query::processor::{QueryLanguage, QueryProcessor};
2619
2620        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2621        let start_time = Instant::now();
2622
2623        let has_mutations = Self::query_looks_like_mutation(query);
2624        let active = self.active_store();
2625
2626        let result = self.with_auto_commit(has_mutations, || {
2627            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2628            let processor = QueryProcessor::for_graph_store_with_transaction(
2629                Arc::clone(&active),
2630                Arc::clone(&self.transaction_manager),
2631            )?;
2632            let processor = if let Some(transaction_id) = transaction_id {
2633                processor.with_transaction_context(viewing_epoch, transaction_id)
2634            } else {
2635                processor
2636            };
2637            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2638        });
2639
2640        #[cfg(feature = "metrics")]
2641        {
2642            #[cfg(not(target_arch = "wasm32"))]
2643            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2644            #[cfg(target_arch = "wasm32")]
2645            let elapsed_ms = None;
2646            self.record_query_metrics("graphql", elapsed_ms, &result);
2647        }
2648
2649        result
2650    }
2651
2652    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2653    ///
2654    /// # Errors
2655    ///
2656    /// Returns an error if the query fails to parse or execute.
2657    ///
2658    /// # Examples
2659    ///
2660    /// ```no_run
2661    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2662    /// use grafeo_engine::GrafeoDB;
2663    ///
2664    /// let db = GrafeoDB::new_in_memory();
2665    /// let session = db.session();
2666    ///
2667    /// let result = session.execute_sql(
2668    ///     "SELECT * FROM GRAPH_TABLE (
2669    ///         MATCH (n:Person)
2670    ///         COLUMNS (n.name AS name)
2671    ///     )"
2672    /// )?;
2673    /// # Ok(())
2674    /// # }
2675    /// ```
2676    #[cfg(feature = "sql-pgq")]
2677    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2678        use crate::query::{
2679            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2680            processor::QueryLanguage, translators::sql_pgq,
2681        };
2682
2683        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2684        let start_time = Instant::now();
2685
2686        // Parse and translate (always needed to check for DDL)
2687        let logical_plan = sql_pgq::translate(query)?;
2688
2689        // Handle DDL statements directly (they don't go through the query pipeline)
2690        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2691            return Ok(QueryResult {
2692                columns: vec!["status".into()],
2693                column_types: vec![grafeo_common::types::LogicalType::String],
2694                rows: vec![vec![Value::from(format!(
2695                    "Property graph '{}' created ({} node tables, {} edge tables)",
2696                    cpg.name,
2697                    cpg.node_tables.len(),
2698                    cpg.edge_tables.len()
2699                ))]],
2700                execution_time_ms: None,
2701                rows_scanned: None,
2702                status_message: None,
2703                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2704            });
2705        }
2706
2707        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2708
2709        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2710            cached_plan
2711        } else {
2712            let mut binder = Binder::new();
2713            let _binding_context = binder.bind(&logical_plan)?;
2714            let active = self.active_store();
2715            let optimizer = Optimizer::from_graph_store(&*active);
2716            let plan = optimizer.optimize(logical_plan)?;
2717            self.query_cache.put_optimized(cache_key, plan.clone());
2718            plan
2719        };
2720
2721        let active = self.active_store();
2722        let has_mutations = optimized_plan.root.has_mutations();
2723
2724        let result = self.with_auto_commit(has_mutations, || {
2725            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2726            let planner =
2727                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2728            let mut physical_plan = planner.plan(&optimized_plan)?;
2729            let executor = Executor::with_columns(physical_plan.columns.clone())
2730                .with_deadline(self.query_deadline());
2731            executor.execute(physical_plan.operator.as_mut())
2732        });
2733
2734        #[cfg(feature = "metrics")]
2735        {
2736            #[cfg(not(target_arch = "wasm32"))]
2737            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2738            #[cfg(target_arch = "wasm32")]
2739            let elapsed_ms = None;
2740            self.record_query_metrics("sql", elapsed_ms, &result);
2741        }
2742
2743        result
2744    }
2745
2746    /// Executes a SQL/PGQ query with parameters.
2747    ///
2748    /// # Errors
2749    ///
2750    /// Returns an error if the query fails to parse or execute.
2751    #[cfg(feature = "sql-pgq")]
2752    pub fn execute_sql_with_params(
2753        &self,
2754        query: &str,
2755        params: std::collections::HashMap<String, Value>,
2756    ) -> Result<QueryResult> {
2757        use crate::query::processor::{QueryLanguage, QueryProcessor};
2758
2759        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2760        let start_time = Instant::now();
2761
2762        let has_mutations = Self::query_looks_like_mutation(query);
2763        let active = self.active_store();
2764
2765        let result = self.with_auto_commit(has_mutations, || {
2766            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2767            let processor = QueryProcessor::for_graph_store_with_transaction(
2768                Arc::clone(&active),
2769                Arc::clone(&self.transaction_manager),
2770            )?;
2771            let processor = if let Some(transaction_id) = transaction_id {
2772                processor.with_transaction_context(viewing_epoch, transaction_id)
2773            } else {
2774                processor
2775            };
2776            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2777        });
2778
2779        #[cfg(feature = "metrics")]
2780        {
2781            #[cfg(not(target_arch = "wasm32"))]
2782            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2783            #[cfg(target_arch = "wasm32")]
2784            let elapsed_ms = None;
2785            self.record_query_metrics("sql", elapsed_ms, &result);
2786        }
2787
2788        result
2789    }
2790
2791    /// Executes a query in the specified language by name.
2792    ///
2793    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2794    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2795    ///
2796    /// # Errors
2797    ///
2798    /// Returns an error if the language is unknown/disabled or the query fails.
2799    pub fn execute_language(
2800        &self,
2801        query: &str,
2802        language: &str,
2803        params: Option<std::collections::HashMap<String, Value>>,
2804    ) -> Result<QueryResult> {
2805        match language {
2806            "gql" => {
2807                if let Some(p) = params {
2808                    self.execute_with_params(query, p)
2809                } else {
2810                    self.execute(query)
2811                }
2812            }
2813            #[cfg(feature = "cypher")]
2814            "cypher" => {
2815                if let Some(p) = params {
2816                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2817
2818                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2819                    let start_time = Instant::now();
2820
2821                    let has_mutations = Self::query_looks_like_mutation(query);
2822                    let active = self.active_store();
2823                    let result = self.with_auto_commit(has_mutations, || {
2824                        let processor = QueryProcessor::for_graph_store_with_transaction(
2825                            Arc::clone(&active),
2826                            Arc::clone(&self.transaction_manager),
2827                        )?;
2828                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2829                        let processor = if let Some(transaction_id) = transaction_id {
2830                            processor.with_transaction_context(viewing_epoch, transaction_id)
2831                        } else {
2832                            processor
2833                        };
2834                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2835                    });
2836
2837                    #[cfg(feature = "metrics")]
2838                    {
2839                        #[cfg(not(target_arch = "wasm32"))]
2840                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2841                        #[cfg(target_arch = "wasm32")]
2842                        let elapsed_ms = None;
2843                        self.record_query_metrics("cypher", elapsed_ms, &result);
2844                    }
2845
2846                    result
2847                } else {
2848                    self.execute_cypher(query)
2849                }
2850            }
2851            #[cfg(feature = "gremlin")]
2852            "gremlin" => {
2853                if let Some(p) = params {
2854                    self.execute_gremlin_with_params(query, p)
2855                } else {
2856                    self.execute_gremlin(query)
2857                }
2858            }
2859            #[cfg(feature = "graphql")]
2860            "graphql" => {
2861                if let Some(p) = params {
2862                    self.execute_graphql_with_params(query, p)
2863                } else {
2864                    self.execute_graphql(query)
2865                }
2866            }
2867            #[cfg(all(feature = "graphql", feature = "rdf"))]
2868            "graphql-rdf" => {
2869                if let Some(p) = params {
2870                    self.execute_graphql_rdf_with_params(query, p)
2871                } else {
2872                    self.execute_graphql_rdf(query)
2873                }
2874            }
2875            #[cfg(feature = "sql-pgq")]
2876            "sql" | "sql-pgq" => {
2877                if let Some(p) = params {
2878                    self.execute_sql_with_params(query, p)
2879                } else {
2880                    self.execute_sql(query)
2881                }
2882            }
2883            #[cfg(all(feature = "sparql", feature = "rdf"))]
2884            "sparql" => {
2885                if let Some(p) = params {
2886                    self.execute_sparql_with_params(query, p)
2887                } else {
2888                    self.execute_sparql(query)
2889                }
2890            }
2891            other => Err(grafeo_common::utils::error::Error::Query(
2892                grafeo_common::utils::error::QueryError::new(
2893                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2894                    format!("Unknown query language: '{other}'"),
2895                ),
2896            )),
2897        }
2898    }
2899
2900    /// Begins a new transaction.
2901    ///
2902    /// # Errors
2903    ///
2904    /// Returns an error if a transaction is already active.
2905    ///
2906    /// # Examples
2907    ///
2908    /// ```no_run
2909    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2910    /// use grafeo_engine::GrafeoDB;
2911    ///
2912    /// let db = GrafeoDB::new_in_memory();
2913    /// let mut session = db.session();
2914    ///
2915    /// session.begin_transaction()?;
2916    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2917    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2918    /// session.commit()?; // Both inserts committed atomically
2919    /// # Ok(())
2920    /// # }
2921    /// ```
2922    /// Clears all cached query plans.
2923    ///
2924    /// The plan cache is shared across all sessions on the same database,
2925    /// so clearing from one session affects all sessions.
2926    pub fn clear_plan_cache(&self) {
2927        self.query_cache.clear();
2928    }
2929
2930    /// Begins a new transaction on this session.
2931    ///
2932    /// Uses the default isolation level (`SnapshotIsolation`).
2933    ///
2934    /// # Errors
2935    ///
2936    /// Returns an error if a transaction is already active.
2937    pub fn begin_transaction(&mut self) -> Result<()> {
2938        self.begin_transaction_inner(false, None)
2939    }
2940
2941    /// Begins a transaction with a specific isolation level.
2942    ///
2943    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
2944    ///
2945    /// # Errors
2946    ///
2947    /// Returns an error if a transaction is already active.
2948    pub fn begin_transaction_with_isolation(
2949        &mut self,
2950        isolation_level: crate::transaction::IsolationLevel,
2951    ) -> Result<()> {
2952        self.begin_transaction_inner(false, Some(isolation_level))
2953    }
2954
2955    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
2956    fn begin_transaction_inner(
2957        &self,
2958        read_only: bool,
2959        isolation_level: Option<crate::transaction::IsolationLevel>,
2960    ) -> Result<()> {
2961        let mut current = self.current_transaction.lock();
2962        if current.is_some() {
2963            // Nested transaction: create an auto-savepoint instead of a new tx.
2964            drop(current);
2965            let mut depth = self.transaction_nesting_depth.lock();
2966            *depth += 1;
2967            let sp_name = format!("_nested_tx_{}", *depth);
2968            self.savepoint(&sp_name)?;
2969            return Ok(());
2970        }
2971
2972        let active = self.active_lpg_store();
2973        self.transaction_start_node_count
2974            .store(active.node_count(), Ordering::Relaxed);
2975        self.transaction_start_edge_count
2976            .store(active.edge_count(), Ordering::Relaxed);
2977        let transaction_id = if let Some(level) = isolation_level {
2978            self.transaction_manager.begin_with_isolation(level)
2979        } else {
2980            self.transaction_manager.begin()
2981        };
2982        *current = Some(transaction_id);
2983        *self.read_only_tx.lock() = read_only;
2984
2985        // Record the initial graph as "touched" for cross-graph atomicity.
2986        // Uses the full storage key (schema/graph) for schema-scoped resolution.
2987        let key = self.active_graph_storage_key();
2988        let mut touched = self.touched_graphs.lock();
2989        touched.clear();
2990        touched.push(key);
2991
2992        #[cfg(feature = "metrics")]
2993        {
2994            crate::metrics::record_metric!(self.metrics, tx_active, inc);
2995            #[cfg(not(target_arch = "wasm32"))]
2996            {
2997                *self.tx_start_time.lock() = Some(Instant::now());
2998            }
2999        }
3000
3001        Ok(())
3002    }
3003
3004    /// Commits the current transaction.
3005    ///
3006    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3007    ///
3008    /// # Errors
3009    ///
3010    /// Returns an error if no transaction is active.
3011    pub fn commit(&mut self) -> Result<()> {
3012        self.commit_inner()
3013    }
3014
3015    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3016    fn commit_inner(&self) -> Result<()> {
3017        // Nested transaction: release the auto-savepoint (changes are preserved).
3018        {
3019            let mut depth = self.transaction_nesting_depth.lock();
3020            if *depth > 0 {
3021                let sp_name = format!("_nested_tx_{depth}");
3022                *depth -= 1;
3023                drop(depth);
3024                return self.release_savepoint(&sp_name);
3025            }
3026        }
3027
3028        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3029            grafeo_common::utils::error::Error::Transaction(
3030                grafeo_common::utils::error::TransactionError::InvalidState(
3031                    "No active transaction".to_string(),
3032                ),
3033            )
3034        })?;
3035
3036        // Validate the transaction first (conflict detection) before committing data.
3037        // If this fails, we rollback the data changes instead of making them permanent.
3038        let touched = self.touched_graphs.lock().clone();
3039        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3040            Ok(epoch) => epoch,
3041            Err(e) => {
3042                // Conflict detected: rollback the data changes
3043                for graph_name in &touched {
3044                    let store = self.resolve_store(graph_name);
3045                    store.rollback_transaction_properties(transaction_id);
3046                }
3047                #[cfg(feature = "rdf")]
3048                self.rollback_rdf_transaction(transaction_id);
3049                *self.read_only_tx.lock() = false;
3050                self.savepoints.lock().clear();
3051                self.touched_graphs.lock().clear();
3052                #[cfg(feature = "metrics")]
3053                {
3054                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3055                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3056                    #[cfg(not(target_arch = "wasm32"))]
3057                    if let Some(start) = self.tx_start_time.lock().take() {
3058                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3059                        crate::metrics::record_metric!(
3060                            self.metrics,
3061                            tx_duration,
3062                            observe duration_ms
3063                        );
3064                    }
3065                }
3066                return Err(e);
3067            }
3068        };
3069
3070        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3071        for graph_name in &touched {
3072            let store = self.resolve_store(graph_name);
3073            store.finalize_version_epochs(transaction_id, commit_epoch);
3074        }
3075
3076        // Commit succeeded: discard undo logs (make changes permanent)
3077        #[cfg(feature = "rdf")]
3078        self.commit_rdf_transaction(transaction_id);
3079
3080        for graph_name in &touched {
3081            let store = self.resolve_store(graph_name);
3082            store.commit_transaction_properties(transaction_id);
3083        }
3084
3085        // Sync epoch for all touched graphs so that convenience lookups
3086        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3087        let current_epoch = self.transaction_manager.current_epoch();
3088        for graph_name in &touched {
3089            let store = self.resolve_store(graph_name);
3090            store.sync_epoch(current_epoch);
3091        }
3092
3093        // Reset read-only flag, clear savepoints and touched graphs
3094        *self.read_only_tx.lock() = false;
3095        self.savepoints.lock().clear();
3096        self.touched_graphs.lock().clear();
3097
3098        // Auto-GC: periodically prune old MVCC versions
3099        if self.gc_interval > 0 {
3100            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3101            if count.is_multiple_of(self.gc_interval) {
3102                let min_epoch = self.transaction_manager.min_active_epoch();
3103                for graph_name in &touched {
3104                    let store = self.resolve_store(graph_name);
3105                    store.gc_versions(min_epoch);
3106                }
3107                self.transaction_manager.gc();
3108                #[cfg(feature = "metrics")]
3109                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3110            }
3111        }
3112
3113        #[cfg(feature = "metrics")]
3114        {
3115            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3116            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3117            #[cfg(not(target_arch = "wasm32"))]
3118            if let Some(start) = self.tx_start_time.lock().take() {
3119                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3120                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3121            }
3122        }
3123
3124        Ok(())
3125    }
3126
3127    /// Aborts the current transaction.
3128    ///
3129    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3130    ///
3131    /// # Errors
3132    ///
3133    /// Returns an error if no transaction is active.
3134    ///
3135    /// # Examples
3136    ///
3137    /// ```no_run
3138    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3139    /// use grafeo_engine::GrafeoDB;
3140    ///
3141    /// let db = GrafeoDB::new_in_memory();
3142    /// let mut session = db.session();
3143    ///
3144    /// session.begin_transaction()?;
3145    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3146    /// session.rollback()?; // Insert is discarded
3147    /// # Ok(())
3148    /// # }
3149    /// ```
3150    pub fn rollback(&mut self) -> Result<()> {
3151        self.rollback_inner()
3152    }
3153
3154    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3155    fn rollback_inner(&self) -> Result<()> {
3156        // Nested transaction: rollback to the auto-savepoint.
3157        {
3158            let mut depth = self.transaction_nesting_depth.lock();
3159            if *depth > 0 {
3160                let sp_name = format!("_nested_tx_{depth}");
3161                *depth -= 1;
3162                drop(depth);
3163                return self.rollback_to_savepoint(&sp_name);
3164            }
3165        }
3166
3167        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3168            grafeo_common::utils::error::Error::Transaction(
3169                grafeo_common::utils::error::TransactionError::InvalidState(
3170                    "No active transaction".to_string(),
3171                ),
3172            )
3173        })?;
3174
3175        // Reset read-only flag
3176        *self.read_only_tx.lock() = false;
3177
3178        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3179        let touched = self.touched_graphs.lock().clone();
3180        for graph_name in &touched {
3181            let store = self.resolve_store(graph_name);
3182            store.discard_uncommitted_versions(transaction_id);
3183        }
3184
3185        // Discard pending operations in the RDF store
3186        #[cfg(feature = "rdf")]
3187        self.rollback_rdf_transaction(transaction_id);
3188
3189        // Clear savepoints and touched graphs
3190        self.savepoints.lock().clear();
3191        self.touched_graphs.lock().clear();
3192
3193        // Mark transaction as aborted in the manager
3194        let result = self.transaction_manager.abort(transaction_id);
3195
3196        #[cfg(feature = "metrics")]
3197        if result.is_ok() {
3198            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3199            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3200            #[cfg(not(target_arch = "wasm32"))]
3201            if let Some(start) = self.tx_start_time.lock().take() {
3202                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3203                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3204            }
3205        }
3206
3207        result
3208    }
3209
3210    /// Creates a named savepoint within the current transaction.
3211    ///
3212    /// The savepoint captures the current node/edge ID counters so that
3213    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3214    /// entities created after this point.
3215    ///
3216    /// # Errors
3217    ///
3218    /// Returns an error if no transaction is active.
3219    pub fn savepoint(&self, name: &str) -> Result<()> {
3220        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3221            grafeo_common::utils::error::Error::Transaction(
3222                grafeo_common::utils::error::TransactionError::InvalidState(
3223                    "No active transaction".to_string(),
3224                ),
3225            )
3226        })?;
3227
3228        // Capture state for every graph touched so far.
3229        let touched = self.touched_graphs.lock().clone();
3230        let graph_snapshots: Vec<GraphSavepoint> = touched
3231            .iter()
3232            .map(|graph_name| {
3233                let store = self.resolve_store(graph_name);
3234                GraphSavepoint {
3235                    graph_name: graph_name.clone(),
3236                    next_node_id: store.peek_next_node_id(),
3237                    next_edge_id: store.peek_next_edge_id(),
3238                    undo_log_position: store.property_undo_log_position(tx_id),
3239                }
3240            })
3241            .collect();
3242
3243        self.savepoints.lock().push(SavepointState {
3244            name: name.to_string(),
3245            graph_snapshots,
3246            active_graph: self.current_graph.lock().clone(),
3247        });
3248        Ok(())
3249    }
3250
3251    /// Rolls back to a named savepoint, undoing all writes made after it.
3252    ///
3253    /// The savepoint and any savepoints created after it are removed.
3254    /// Entities with IDs >= the savepoint snapshot are discarded.
3255    ///
3256    /// # Errors
3257    ///
3258    /// Returns an error if no transaction is active or the savepoint does not exist.
3259    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3260        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3261            grafeo_common::utils::error::Error::Transaction(
3262                grafeo_common::utils::error::TransactionError::InvalidState(
3263                    "No active transaction".to_string(),
3264                ),
3265            )
3266        })?;
3267
3268        let mut savepoints = self.savepoints.lock();
3269
3270        // Find the savepoint by name (search from the end for nested savepoints)
3271        let pos = savepoints
3272            .iter()
3273            .rposition(|sp| sp.name == name)
3274            .ok_or_else(|| {
3275                grafeo_common::utils::error::Error::Transaction(
3276                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3277                        "Savepoint '{name}' not found"
3278                    )),
3279                )
3280            })?;
3281
3282        let sp_state = savepoints[pos].clone();
3283
3284        // Remove this savepoint and all later ones
3285        savepoints.truncate(pos);
3286        drop(savepoints);
3287
3288        // Roll back each graph that was captured in the savepoint.
3289        for gs in &sp_state.graph_snapshots {
3290            let store = self.resolve_store(&gs.graph_name);
3291
3292            // Replay property/label undo entries recorded after the savepoint
3293            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3294
3295            // Discard entities created after the savepoint
3296            let current_next_node = store.peek_next_node_id();
3297            let current_next_edge = store.peek_next_edge_id();
3298
3299            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3300                .map(NodeId::new)
3301                .collect();
3302            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3303                .map(EdgeId::new)
3304                .collect();
3305
3306            if !node_ids.is_empty() || !edge_ids.is_empty() {
3307                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3308            }
3309        }
3310
3311        // Also roll back any graphs that were touched AFTER the savepoint
3312        // but not captured in it. These need full discard since the savepoint
3313        // didn't include them.
3314        let touched = self.touched_graphs.lock().clone();
3315        for graph_name in &touched {
3316            let already_captured = sp_state
3317                .graph_snapshots
3318                .iter()
3319                .any(|gs| gs.graph_name == *graph_name);
3320            if !already_captured {
3321                let store = self.resolve_store(graph_name);
3322                store.discard_uncommitted_versions(transaction_id);
3323            }
3324        }
3325
3326        // Restore touched_graphs to only the graphs that were known at savepoint time.
3327        let mut touched = self.touched_graphs.lock();
3328        touched.clear();
3329        for gs in &sp_state.graph_snapshots {
3330            if !touched.contains(&gs.graph_name) {
3331                touched.push(gs.graph_name.clone());
3332            }
3333        }
3334
3335        Ok(())
3336    }
3337
3338    /// Releases (removes) a named savepoint without rolling back.
3339    ///
3340    /// # Errors
3341    ///
3342    /// Returns an error if no transaction is active or the savepoint does not exist.
3343    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3344        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3345            grafeo_common::utils::error::Error::Transaction(
3346                grafeo_common::utils::error::TransactionError::InvalidState(
3347                    "No active transaction".to_string(),
3348                ),
3349            )
3350        })?;
3351
3352        let mut savepoints = self.savepoints.lock();
3353        let pos = savepoints
3354            .iter()
3355            .rposition(|sp| sp.name == name)
3356            .ok_or_else(|| {
3357                grafeo_common::utils::error::Error::Transaction(
3358                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3359                        "Savepoint '{name}' not found"
3360                    )),
3361                )
3362            })?;
3363        savepoints.remove(pos);
3364        Ok(())
3365    }
3366
3367    /// Returns whether a transaction is active.
3368    #[must_use]
3369    pub fn in_transaction(&self) -> bool {
3370        self.current_transaction.lock().is_some()
3371    }
3372
3373    /// Returns the current transaction ID, if any.
3374    #[must_use]
3375    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3376        *self.current_transaction.lock()
3377    }
3378
3379    /// Returns a reference to the transaction manager.
3380    #[must_use]
3381    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3382        &self.transaction_manager
3383    }
3384
3385    /// Returns the store's current node count and the count at transaction start.
3386    #[must_use]
3387    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3388        (
3389            self.transaction_start_node_count.load(Ordering::Relaxed),
3390            self.active_lpg_store().node_count(),
3391        )
3392    }
3393
3394    /// Returns the store's current edge count and the count at transaction start.
3395    #[must_use]
3396    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3397        (
3398            self.transaction_start_edge_count.load(Ordering::Relaxed),
3399            self.active_lpg_store().edge_count(),
3400        )
3401    }
3402
3403    /// Prepares the current transaction for a two-phase commit.
3404    ///
3405    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3406    /// lets you inspect pending changes and attach metadata before finalizing.
3407    /// The mutable borrow prevents concurrent operations while the commit is
3408    /// pending.
3409    ///
3410    /// If the `PreparedCommit` is dropped without calling `commit()` or
3411    /// `abort()`, the transaction is automatically rolled back.
3412    ///
3413    /// # Errors
3414    ///
3415    /// Returns an error if no transaction is active.
3416    ///
3417    /// # Examples
3418    ///
3419    /// ```no_run
3420    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3421    /// use grafeo_engine::GrafeoDB;
3422    ///
3423    /// let db = GrafeoDB::new_in_memory();
3424    /// let mut session = db.session();
3425    ///
3426    /// session.begin_transaction()?;
3427    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3428    ///
3429    /// let mut prepared = session.prepare_commit()?;
3430    /// println!("Nodes written: {}", prepared.info().nodes_written);
3431    /// prepared.set_metadata("audit_user", "admin");
3432    /// prepared.commit()?;
3433    /// # Ok(())
3434    /// # }
3435    /// ```
3436    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3437        crate::transaction::PreparedCommit::new(self)
3438    }
3439
3440    /// Sets auto-commit mode.
3441    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3442        self.auto_commit = auto_commit;
3443    }
3444
3445    /// Returns whether auto-commit is enabled.
3446    #[must_use]
3447    pub fn auto_commit(&self) -> bool {
3448        self.auto_commit
3449    }
3450
3451    /// Returns `true` if auto-commit should wrap this execution.
3452    ///
3453    /// Auto-commit kicks in when: the session is in auto-commit mode,
3454    /// no explicit transaction is active, and the query mutates data.
3455    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3456        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3457    }
3458
3459    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3460    /// returns `true`. On error the transaction is rolled back.
3461    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3462    where
3463        F: FnOnce() -> Result<QueryResult>,
3464    {
3465        if self.needs_auto_commit(has_mutations) {
3466            self.begin_transaction_inner(false, None)?;
3467            match body() {
3468                Ok(result) => {
3469                    self.commit_inner()?;
3470                    Ok(result)
3471                }
3472                Err(e) => {
3473                    let _ = self.rollback_inner();
3474                    Err(e)
3475                }
3476            }
3477        } else {
3478            body()
3479        }
3480    }
3481
3482    /// Quick heuristic: returns `true` when the query text looks like it
3483    /// performs a mutation. Used by `_with_params` paths that go through the
3484    /// `QueryProcessor` (where the logical plan isn't available before
3485    /// execution). False negatives are harmless: the data just won't be
3486    /// auto-committed, which matches the prior behaviour.
3487    fn query_looks_like_mutation(query: &str) -> bool {
3488        let upper = query.to_ascii_uppercase();
3489        upper.contains("INSERT")
3490            || upper.contains("CREATE")
3491            || upper.contains("DELETE")
3492            || upper.contains("MERGE")
3493            || upper.contains("SET")
3494            || upper.contains("REMOVE")
3495            || upper.contains("DROP")
3496            || upper.contains("ALTER")
3497    }
3498
3499    /// Computes the wall-clock deadline for query execution.
3500    #[must_use]
3501    fn query_deadline(&self) -> Option<Instant> {
3502        #[cfg(not(target_arch = "wasm32"))]
3503        {
3504            self.query_timeout.map(|d| Instant::now() + d)
3505        }
3506        #[cfg(target_arch = "wasm32")]
3507        {
3508            let _ = &self.query_timeout;
3509            None
3510        }
3511    }
3512
3513    /// Records query metrics for any language.
3514    ///
3515    /// Called after query execution to update counters, latency histogram,
3516    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3517    /// where `Instant` is unavailable.
3518    #[cfg(feature = "metrics")]
3519    fn record_query_metrics(
3520        &self,
3521        language: &str,
3522        elapsed_ms: Option<f64>,
3523        result: &Result<crate::database::QueryResult>,
3524    ) {
3525        use crate::metrics::record_metric;
3526
3527        record_metric!(self.metrics, query_count, inc);
3528        if let Some(ref reg) = self.metrics {
3529            reg.query_count_by_language.increment(language);
3530        }
3531        if let Some(ms) = elapsed_ms {
3532            record_metric!(self.metrics, query_latency, observe ms);
3533        }
3534        match result {
3535            Ok(r) => {
3536                let returned = r.rows.len() as u64;
3537                record_metric!(self.metrics, rows_returned, add returned);
3538                if let Some(scanned) = r.rows_scanned {
3539                    record_metric!(self.metrics, rows_scanned, add scanned);
3540                }
3541            }
3542            Err(e) => {
3543                record_metric!(self.metrics, query_errors, inc);
3544                // Detect timeout errors
3545                let msg = e.to_string();
3546                if msg.contains("exceeded timeout") {
3547                    record_metric!(self.metrics, query_timeouts, inc);
3548                }
3549            }
3550        }
3551    }
3552
3553    /// Evaluates a simple integer literal from a session parameter expression.
3554    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3555        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3556        match expr {
3557            Expression::Literal(Literal::Integer(n)) => Some(*n),
3558            _ => None,
3559        }
3560    }
3561
3562    /// Returns the current transaction context for MVCC visibility.
3563    ///
3564    /// Returns `(viewing_epoch, transaction_id)` where:
3565    /// - `viewing_epoch` is the epoch at which to check version visibility
3566    /// - `transaction_id` is the current transaction ID (if in a transaction)
3567    #[must_use]
3568    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3569        // Time-travel override takes precedence (read-only, no tx context)
3570        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3571            return (epoch, None);
3572        }
3573
3574        if let Some(transaction_id) = *self.current_transaction.lock() {
3575            // In a transaction: use the transaction's start epoch
3576            let epoch = self
3577                .transaction_manager
3578                .start_epoch(transaction_id)
3579                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3580            (epoch, Some(transaction_id))
3581        } else {
3582            // No transaction: use current epoch
3583            (self.transaction_manager.current_epoch(), None)
3584        }
3585    }
3586
3587    /// Creates a planner with transaction context and constraint validator.
3588    ///
3589    /// The `store` parameter is the graph store to plan against (use
3590    /// `self.active_store()` for graph-aware execution).
3591    fn create_planner_for_store(
3592        &self,
3593        store: Arc<dyn GraphStoreMut>,
3594        viewing_epoch: EpochId,
3595        transaction_id: Option<TransactionId>,
3596    ) -> crate::query::Planner {
3597        use crate::query::Planner;
3598        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3599
3600        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3601        let info_store = Arc::clone(&store);
3602        let schema_store = Arc::clone(&store);
3603
3604        let session_context = SessionContext {
3605            current_schema: self.current_schema(),
3606            current_graph: self.current_graph(),
3607            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3608            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3609        };
3610
3611        let mut planner = Planner::with_context(
3612            Arc::clone(&store),
3613            Arc::clone(&self.transaction_manager),
3614            transaction_id,
3615            viewing_epoch,
3616        )
3617        .with_factorized_execution(self.factorized_execution)
3618        .with_catalog(Arc::clone(&self.catalog))
3619        .with_session_context(session_context);
3620
3621        // Attach the constraint validator for schema enforcement
3622        let validator =
3623            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3624        planner = planner.with_validator(Arc::new(validator));
3625
3626        planner
3627    }
3628
3629    /// Builds a `Value::Map` for the `info()` introspection function.
3630    fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3631        use grafeo_common::types::PropertyKey;
3632        use std::collections::BTreeMap;
3633
3634        let mut map = BTreeMap::new();
3635        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3636        map.insert(
3637            PropertyKey::from("node_count"),
3638            Value::Int64(store.node_count() as i64),
3639        );
3640        map.insert(
3641            PropertyKey::from("edge_count"),
3642            Value::Int64(store.edge_count() as i64),
3643        );
3644        map.insert(
3645            PropertyKey::from("version"),
3646            Value::String(env!("CARGO_PKG_VERSION").into()),
3647        );
3648        Value::Map(map.into())
3649    }
3650
3651    /// Builds a `Value::Map` for the `schema()` introspection function.
3652    fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3653        use grafeo_common::types::PropertyKey;
3654        use std::collections::BTreeMap;
3655
3656        let labels: Vec<Value> = store
3657            .all_labels()
3658            .into_iter()
3659            .map(|l| Value::String(l.into()))
3660            .collect();
3661        let edge_types: Vec<Value> = store
3662            .all_edge_types()
3663            .into_iter()
3664            .map(|t| Value::String(t.into()))
3665            .collect();
3666        let property_keys: Vec<Value> = store
3667            .all_property_keys()
3668            .into_iter()
3669            .map(|k| Value::String(k.into()))
3670            .collect();
3671
3672        let mut map = BTreeMap::new();
3673        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3674        map.insert(
3675            PropertyKey::from("edge_types"),
3676            Value::List(edge_types.into()),
3677        );
3678        map.insert(
3679            PropertyKey::from("property_keys"),
3680            Value::List(property_keys.into()),
3681        );
3682        Value::Map(map.into())
3683    }
3684
3685    /// Creates a node directly (bypassing query execution).
3686    ///
3687    /// This is a low-level API for testing and direct manipulation.
3688    /// If a transaction is active, the node will be versioned with the transaction ID.
3689    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3690        let (epoch, transaction_id) = self.get_transaction_context();
3691        self.active_lpg_store().create_node_versioned(
3692            labels,
3693            epoch,
3694            transaction_id.unwrap_or(TransactionId::SYSTEM),
3695        )
3696    }
3697
3698    /// Creates a node with properties.
3699    ///
3700    /// If a transaction is active, the node will be versioned with the transaction ID.
3701    pub fn create_node_with_props<'a>(
3702        &self,
3703        labels: &[&str],
3704        properties: impl IntoIterator<Item = (&'a str, Value)>,
3705    ) -> NodeId {
3706        let (epoch, transaction_id) = self.get_transaction_context();
3707        self.active_lpg_store().create_node_with_props_versioned(
3708            labels,
3709            properties,
3710            epoch,
3711            transaction_id.unwrap_or(TransactionId::SYSTEM),
3712        )
3713    }
3714
3715    /// Creates an edge between two nodes.
3716    ///
3717    /// This is a low-level API for testing and direct manipulation.
3718    /// If a transaction is active, the edge will be versioned with the transaction ID.
3719    pub fn create_edge(
3720        &self,
3721        src: NodeId,
3722        dst: NodeId,
3723        edge_type: &str,
3724    ) -> grafeo_common::types::EdgeId {
3725        let (epoch, transaction_id) = self.get_transaction_context();
3726        self.active_lpg_store().create_edge_versioned(
3727            src,
3728            dst,
3729            edge_type,
3730            epoch,
3731            transaction_id.unwrap_or(TransactionId::SYSTEM),
3732        )
3733    }
3734
3735    // =========================================================================
3736    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3737    // =========================================================================
3738
3739    /// Gets a node by ID directly, bypassing query planning.
3740    ///
3741    /// This is the fastest way to retrieve a single node when you know its ID.
3742    /// Skips parsing, binding, optimization, and physical planning entirely.
3743    ///
3744    /// # Performance
3745    ///
3746    /// - Time complexity: O(1) average case
3747    /// - No lock contention (uses DashMap internally)
3748    /// - ~20-30x faster than equivalent MATCH query
3749    ///
3750    /// # Example
3751    ///
3752    /// ```no_run
3753    /// # use grafeo_engine::GrafeoDB;
3754    /// # let db = GrafeoDB::new_in_memory();
3755    /// let session = db.session();
3756    /// let node_id = session.create_node(&["Person"]);
3757    ///
3758    /// // Direct lookup - O(1), no query planning
3759    /// let node = session.get_node(node_id);
3760    /// assert!(node.is_some());
3761    /// ```
3762    #[must_use]
3763    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3764        let (epoch, transaction_id) = self.get_transaction_context();
3765        self.active_lpg_store().get_node_versioned(
3766            id,
3767            epoch,
3768            transaction_id.unwrap_or(TransactionId::SYSTEM),
3769        )
3770    }
3771
3772    /// Gets a single property from a node by ID, bypassing query planning.
3773    ///
3774    /// More efficient than `get_node()` when you only need one property,
3775    /// as it avoids loading the full node with all properties.
3776    ///
3777    /// # Performance
3778    ///
3779    /// - Time complexity: O(1) average case
3780    /// - No query planning overhead
3781    ///
3782    /// # Example
3783    ///
3784    /// ```no_run
3785    /// # use grafeo_engine::GrafeoDB;
3786    /// # use grafeo_common::types::Value;
3787    /// # let db = GrafeoDB::new_in_memory();
3788    /// let session = db.session();
3789    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3790    ///
3791    /// // Direct property access - O(1)
3792    /// let name = session.get_node_property(id, "name");
3793    /// assert_eq!(name, Some(Value::String("Alix".into())));
3794    /// ```
3795    #[must_use]
3796    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3797        self.get_node(id)
3798            .and_then(|node| node.get_property(key).cloned())
3799    }
3800
3801    /// Gets an edge by ID directly, bypassing query planning.
3802    ///
3803    /// # Performance
3804    ///
3805    /// - Time complexity: O(1) average case
3806    /// - No lock contention
3807    #[must_use]
3808    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3809        let (epoch, transaction_id) = self.get_transaction_context();
3810        self.active_lpg_store().get_edge_versioned(
3811            id,
3812            epoch,
3813            transaction_id.unwrap_or(TransactionId::SYSTEM),
3814        )
3815    }
3816
3817    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3818    ///
3819    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3820    ///
3821    /// # Performance
3822    ///
3823    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3824    /// - Uses adjacency index for direct access
3825    /// - ~10-20x faster than equivalent MATCH query
3826    ///
3827    /// # Example
3828    ///
3829    /// ```no_run
3830    /// # use grafeo_engine::GrafeoDB;
3831    /// # let db = GrafeoDB::new_in_memory();
3832    /// let session = db.session();
3833    /// let alix = session.create_node(&["Person"]);
3834    /// let gus = session.create_node(&["Person"]);
3835    /// session.create_edge(alix, gus, "KNOWS");
3836    ///
3837    /// // Direct neighbor lookup - O(degree)
3838    /// let neighbors = session.get_neighbors_outgoing(alix);
3839    /// assert_eq!(neighbors.len(), 1);
3840    /// assert_eq!(neighbors[0].0, gus);
3841    /// ```
3842    #[must_use]
3843    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3844        self.active_lpg_store()
3845            .edges_from(node, Direction::Outgoing)
3846            .collect()
3847    }
3848
3849    /// Gets incoming neighbors of a node directly, bypassing query planning.
3850    ///
3851    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3852    ///
3853    /// # Performance
3854    ///
3855    /// - Time complexity: O(degree) where degree is the number of incoming edges
3856    /// - Uses backward adjacency index for direct access
3857    #[must_use]
3858    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3859        self.active_lpg_store()
3860            .edges_from(node, Direction::Incoming)
3861            .collect()
3862    }
3863
3864    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3865    ///
3866    /// # Example
3867    ///
3868    /// ```no_run
3869    /// # use grafeo_engine::GrafeoDB;
3870    /// # let db = GrafeoDB::new_in_memory();
3871    /// # let session = db.session();
3872    /// # let alix = session.create_node(&["Person"]);
3873    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3874    /// ```
3875    #[must_use]
3876    pub fn get_neighbors_outgoing_by_type(
3877        &self,
3878        node: NodeId,
3879        edge_type: &str,
3880    ) -> Vec<(NodeId, EdgeId)> {
3881        self.active_lpg_store()
3882            .edges_from(node, Direction::Outgoing)
3883            .filter(|(_, edge_id)| {
3884                self.get_edge(*edge_id)
3885                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3886            })
3887            .collect()
3888    }
3889
3890    /// Checks if a node exists, bypassing query planning.
3891    ///
3892    /// # Performance
3893    ///
3894    /// - Time complexity: O(1)
3895    /// - Fastest existence check available
3896    #[must_use]
3897    pub fn node_exists(&self, id: NodeId) -> bool {
3898        self.get_node(id).is_some()
3899    }
3900
3901    /// Checks if an edge exists, bypassing query planning.
3902    #[must_use]
3903    pub fn edge_exists(&self, id: EdgeId) -> bool {
3904        self.get_edge(id).is_some()
3905    }
3906
3907    /// Gets the degree (number of edges) of a node.
3908    ///
3909    /// Returns (outgoing_degree, incoming_degree).
3910    #[must_use]
3911    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3912        let active = self.active_lpg_store();
3913        let out = active.out_degree(node);
3914        let in_degree = active.in_degree(node);
3915        (out, in_degree)
3916    }
3917
3918    /// Batch lookup of multiple nodes by ID.
3919    ///
3920    /// More efficient than calling `get_node()` in a loop because it
3921    /// amortizes overhead.
3922    ///
3923    /// # Performance
3924    ///
3925    /// - Time complexity: O(n) where n is the number of IDs
3926    /// - Better cache utilization than individual lookups
3927    #[must_use]
3928    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3929        let (epoch, transaction_id) = self.get_transaction_context();
3930        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3931        let active = self.active_lpg_store();
3932        ids.iter()
3933            .map(|&id| active.get_node_versioned(id, epoch, tx))
3934            .collect()
3935    }
3936
3937    // ── Change Data Capture ─────────────────────────────────────────────
3938
3939    /// Returns the full change history for an entity (node or edge).
3940    #[cfg(feature = "cdc")]
3941    pub fn history(
3942        &self,
3943        entity_id: impl Into<crate::cdc::EntityId>,
3944    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3945        Ok(self.cdc_log.history(entity_id.into()))
3946    }
3947
3948    /// Returns change events for an entity since the given epoch.
3949    #[cfg(feature = "cdc")]
3950    pub fn history_since(
3951        &self,
3952        entity_id: impl Into<crate::cdc::EntityId>,
3953        since_epoch: EpochId,
3954    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3955        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3956    }
3957
3958    /// Returns all change events across all entities in an epoch range.
3959    #[cfg(feature = "cdc")]
3960    pub fn changes_between(
3961        &self,
3962        start_epoch: EpochId,
3963        end_epoch: EpochId,
3964    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3965        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3966    }
3967}
3968
3969impl Drop for Session {
3970    fn drop(&mut self) {
3971        // Auto-rollback any active transaction to prevent leaked MVCC state,
3972        // dangling write locks, and uncommitted versions lingering in the store.
3973        if self.in_transaction() {
3974            let _ = self.rollback_inner();
3975        }
3976
3977        #[cfg(feature = "metrics")]
3978        if let Some(ref reg) = self.metrics {
3979            reg.session_active
3980                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
3981        }
3982    }
3983}
3984
3985#[cfg(test)]
3986mod tests {
3987    use super::parse_default_literal;
3988    use crate::database::GrafeoDB;
3989    use grafeo_common::types::Value;
3990
3991    // -----------------------------------------------------------------------
3992    // parse_default_literal
3993    // -----------------------------------------------------------------------
3994
3995    #[test]
3996    fn parse_default_literal_null() {
3997        assert_eq!(parse_default_literal("null"), Value::Null);
3998        assert_eq!(parse_default_literal("NULL"), Value::Null);
3999        assert_eq!(parse_default_literal("Null"), Value::Null);
4000    }
4001
4002    #[test]
4003    fn parse_default_literal_bool() {
4004        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4005        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4006        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4007        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4008    }
4009
4010    #[test]
4011    fn parse_default_literal_string_single_quoted() {
4012        assert_eq!(
4013            parse_default_literal("'hello'"),
4014            Value::String("hello".into())
4015        );
4016    }
4017
4018    #[test]
4019    fn parse_default_literal_string_double_quoted() {
4020        assert_eq!(
4021            parse_default_literal("\"world\""),
4022            Value::String("world".into())
4023        );
4024    }
4025
4026    #[test]
4027    fn parse_default_literal_integer() {
4028        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4029        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4030        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4031    }
4032
4033    #[test]
4034    fn parse_default_literal_float() {
4035        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4036        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4037    }
4038
4039    #[test]
4040    fn parse_default_literal_fallback_string() {
4041        // Not a recognized literal, not quoted, not a number
4042        assert_eq!(
4043            parse_default_literal("some_identifier"),
4044            Value::String("some_identifier".into())
4045        );
4046    }
4047
4048    #[test]
4049    fn test_session_create_node() {
4050        let db = GrafeoDB::new_in_memory();
4051        let session = db.session();
4052
4053        let id = session.create_node(&["Person"]);
4054        assert!(id.is_valid());
4055        assert_eq!(db.node_count(), 1);
4056    }
4057
4058    #[test]
4059    fn test_session_transaction() {
4060        let db = GrafeoDB::new_in_memory();
4061        let mut session = db.session();
4062
4063        assert!(!session.in_transaction());
4064
4065        session.begin_transaction().unwrap();
4066        assert!(session.in_transaction());
4067
4068        session.commit().unwrap();
4069        assert!(!session.in_transaction());
4070    }
4071
4072    #[test]
4073    fn test_session_transaction_context() {
4074        let db = GrafeoDB::new_in_memory();
4075        let mut session = db.session();
4076
4077        // Without transaction - context should have current epoch and no transaction_id
4078        let (_epoch1, transaction_id1) = session.get_transaction_context();
4079        assert!(transaction_id1.is_none());
4080
4081        // Start a transaction
4082        session.begin_transaction().unwrap();
4083        let (epoch2, transaction_id2) = session.get_transaction_context();
4084        assert!(transaction_id2.is_some());
4085        // Transaction should have a valid epoch
4086        let _ = epoch2; // Use the variable
4087
4088        // Commit and verify
4089        session.commit().unwrap();
4090        let (epoch3, tx_id3) = session.get_transaction_context();
4091        assert!(tx_id3.is_none());
4092        // Epoch should have advanced after commit
4093        assert!(epoch3.as_u64() >= epoch2.as_u64());
4094    }
4095
4096    #[test]
4097    fn test_session_rollback() {
4098        let db = GrafeoDB::new_in_memory();
4099        let mut session = db.session();
4100
4101        session.begin_transaction().unwrap();
4102        session.rollback().unwrap();
4103        assert!(!session.in_transaction());
4104    }
4105
4106    #[test]
4107    fn test_session_rollback_discards_versions() {
4108        use grafeo_common::types::TransactionId;
4109
4110        let db = GrafeoDB::new_in_memory();
4111
4112        // Create a node outside of any transaction (at system level)
4113        let node_before = db.store().create_node(&["Person"]);
4114        assert!(node_before.is_valid());
4115        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4116
4117        // Start a transaction
4118        let mut session = db.session();
4119        session.begin_transaction().unwrap();
4120        let transaction_id = session.current_transaction.lock().unwrap();
4121
4122        // Create a node versioned with the transaction's ID
4123        let epoch = db.store().current_epoch();
4124        let node_in_tx = db
4125            .store()
4126            .create_node_versioned(&["Person"], epoch, transaction_id);
4127        assert!(node_in_tx.is_valid());
4128
4129        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4130        // non-versioned lookups like node_count(). Verify the node is visible
4131        // only through the owning transaction.
4132        assert_eq!(
4133            db.node_count(),
4134            1,
4135            "PENDING nodes should be invisible to non-versioned node_count()"
4136        );
4137        assert!(
4138            db.store()
4139                .get_node_versioned(node_in_tx, epoch, transaction_id)
4140                .is_some(),
4141            "Transaction node should be visible to its own transaction"
4142        );
4143
4144        // Rollback the transaction
4145        session.rollback().unwrap();
4146        assert!(!session.in_transaction());
4147
4148        // The node created in the transaction should be discarded
4149        // Only the first node should remain visible
4150        let count_after = db.node_count();
4151        assert_eq!(
4152            count_after, 1,
4153            "Rollback should discard uncommitted node, but got {count_after}"
4154        );
4155
4156        // The original node should still be accessible
4157        let current_epoch = db.store().current_epoch();
4158        assert!(
4159            db.store()
4160                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4161                .is_some(),
4162            "Original node should still exist"
4163        );
4164
4165        // The node created in the transaction should not be accessible
4166        assert!(
4167            db.store()
4168                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4169                .is_none(),
4170            "Transaction node should be gone"
4171        );
4172    }
4173
4174    #[test]
4175    fn test_session_create_node_in_transaction() {
4176        // Test that session.create_node() is transaction-aware
4177        let db = GrafeoDB::new_in_memory();
4178
4179        // Create a node outside of any transaction
4180        let node_before = db.create_node(&["Person"]);
4181        assert!(node_before.is_valid());
4182        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4183
4184        // Start a transaction and create a node through the session
4185        let mut session = db.session();
4186        session.begin_transaction().unwrap();
4187        let transaction_id = session.current_transaction.lock().unwrap();
4188
4189        // Create a node through session.create_node() - should be versioned with tx
4190        let node_in_tx = session.create_node(&["Person"]);
4191        assert!(node_in_tx.is_valid());
4192
4193        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4194        // non-versioned lookups. Verify the node is visible only to its own tx.
4195        assert_eq!(
4196            db.node_count(),
4197            1,
4198            "PENDING nodes should be invisible to non-versioned node_count()"
4199        );
4200        let epoch = db.store().current_epoch();
4201        assert!(
4202            db.store()
4203                .get_node_versioned(node_in_tx, epoch, transaction_id)
4204                .is_some(),
4205            "Transaction node should be visible to its own transaction"
4206        );
4207
4208        // Rollback the transaction
4209        session.rollback().unwrap();
4210
4211        // The node created via session.create_node() should be discarded
4212        let count_after = db.node_count();
4213        assert_eq!(
4214            count_after, 1,
4215            "Rollback should discard node created via session.create_node(), but got {count_after}"
4216        );
4217    }
4218
4219    #[test]
4220    fn test_session_create_node_with_props_in_transaction() {
4221        use grafeo_common::types::Value;
4222
4223        // Test that session.create_node_with_props() is transaction-aware
4224        let db = GrafeoDB::new_in_memory();
4225
4226        // Create a node outside of any transaction
4227        db.create_node(&["Person"]);
4228        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4229
4230        // Start a transaction and create a node with properties
4231        let mut session = db.session();
4232        session.begin_transaction().unwrap();
4233        let transaction_id = session.current_transaction.lock().unwrap();
4234
4235        let node_in_tx =
4236            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4237        assert!(node_in_tx.is_valid());
4238
4239        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4240        // non-versioned lookups. Verify the node is visible only to its own tx.
4241        assert_eq!(
4242            db.node_count(),
4243            1,
4244            "PENDING nodes should be invisible to non-versioned node_count()"
4245        );
4246        let epoch = db.store().current_epoch();
4247        assert!(
4248            db.store()
4249                .get_node_versioned(node_in_tx, epoch, transaction_id)
4250                .is_some(),
4251            "Transaction node should be visible to its own transaction"
4252        );
4253
4254        // Rollback the transaction
4255        session.rollback().unwrap();
4256
4257        // The node should be discarded
4258        let count_after = db.node_count();
4259        assert_eq!(
4260            count_after, 1,
4261            "Rollback should discard node created via session.create_node_with_props()"
4262        );
4263    }
4264
4265    #[cfg(feature = "gql")]
4266    mod gql_tests {
4267        use super::*;
4268
4269        #[test]
4270        fn test_gql_query_execution() {
4271            let db = GrafeoDB::new_in_memory();
4272            let session = db.session();
4273
4274            // Create some test data
4275            session.create_node(&["Person"]);
4276            session.create_node(&["Person"]);
4277            session.create_node(&["Animal"]);
4278
4279            // Execute a GQL query
4280            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4281
4282            // Should return 2 Person nodes
4283            assert_eq!(result.row_count(), 2);
4284            assert_eq!(result.column_count(), 1);
4285            assert_eq!(result.columns[0], "n");
4286        }
4287
4288        #[test]
4289        fn test_gql_empty_result() {
4290            let db = GrafeoDB::new_in_memory();
4291            let session = db.session();
4292
4293            // No data in database
4294            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4295
4296            assert_eq!(result.row_count(), 0);
4297        }
4298
4299        #[test]
4300        fn test_gql_parse_error() {
4301            let db = GrafeoDB::new_in_memory();
4302            let session = db.session();
4303
4304            // Invalid GQL syntax
4305            let result = session.execute("MATCH (n RETURN n");
4306
4307            assert!(result.is_err());
4308        }
4309
4310        #[test]
4311        fn test_gql_relationship_traversal() {
4312            let db = GrafeoDB::new_in_memory();
4313            let session = db.session();
4314
4315            // Create a graph: Alix -> Gus, Alix -> Vincent
4316            let alix = session.create_node(&["Person"]);
4317            let gus = session.create_node(&["Person"]);
4318            let vincent = session.create_node(&["Person"]);
4319
4320            session.create_edge(alix, gus, "KNOWS");
4321            session.create_edge(alix, vincent, "KNOWS");
4322
4323            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4324            let result = session
4325                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4326                .unwrap();
4327
4328            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4329            assert_eq!(result.row_count(), 2);
4330            assert_eq!(result.column_count(), 2);
4331            assert_eq!(result.columns[0], "a");
4332            assert_eq!(result.columns[1], "b");
4333        }
4334
4335        #[test]
4336        fn test_gql_relationship_with_type_filter() {
4337            let db = GrafeoDB::new_in_memory();
4338            let session = db.session();
4339
4340            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4341            let alix = session.create_node(&["Person"]);
4342            let gus = session.create_node(&["Person"]);
4343            let vincent = session.create_node(&["Person"]);
4344
4345            session.create_edge(alix, gus, "KNOWS");
4346            session.create_edge(alix, vincent, "WORKS_WITH");
4347
4348            // Query only KNOWS relationships
4349            let result = session
4350                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4351                .unwrap();
4352
4353            // Should return only 1 row (Alix->Gus)
4354            assert_eq!(result.row_count(), 1);
4355        }
4356
4357        #[test]
4358        fn test_gql_semantic_error_undefined_variable() {
4359            let db = GrafeoDB::new_in_memory();
4360            let session = db.session();
4361
4362            // Reference undefined variable 'x' in RETURN
4363            let result = session.execute("MATCH (n:Person) RETURN x");
4364
4365            // Should fail with semantic error
4366            assert!(result.is_err());
4367            let Err(err) = result else {
4368                panic!("Expected error")
4369            };
4370            assert!(
4371                err.to_string().contains("Undefined variable"),
4372                "Expected undefined variable error, got: {}",
4373                err
4374            );
4375        }
4376
4377        #[test]
4378        fn test_gql_where_clause_property_filter() {
4379            use grafeo_common::types::Value;
4380
4381            let db = GrafeoDB::new_in_memory();
4382            let session = db.session();
4383
4384            // Create people with ages
4385            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4386            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4387            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4388
4389            // Query with WHERE clause: age > 30
4390            let result = session
4391                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4392                .unwrap();
4393
4394            // Should return 2 people (ages 35 and 45)
4395            assert_eq!(result.row_count(), 2);
4396        }
4397
4398        #[test]
4399        fn test_gql_where_clause_equality() {
4400            use grafeo_common::types::Value;
4401
4402            let db = GrafeoDB::new_in_memory();
4403            let session = db.session();
4404
4405            // Create people with names
4406            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4407            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4408            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4409
4410            // Query with WHERE clause: name = "Alix"
4411            let result = session
4412                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4413                .unwrap();
4414
4415            // Should return 2 people named Alix
4416            assert_eq!(result.row_count(), 2);
4417        }
4418
4419        #[test]
4420        fn test_gql_return_property_access() {
4421            use grafeo_common::types::Value;
4422
4423            let db = GrafeoDB::new_in_memory();
4424            let session = db.session();
4425
4426            // Create people with names and ages
4427            session.create_node_with_props(
4428                &["Person"],
4429                [
4430                    ("name", Value::String("Alix".into())),
4431                    ("age", Value::Int64(30)),
4432                ],
4433            );
4434            session.create_node_with_props(
4435                &["Person"],
4436                [
4437                    ("name", Value::String("Gus".into())),
4438                    ("age", Value::Int64(25)),
4439                ],
4440            );
4441
4442            // Query returning properties
4443            let result = session
4444                .execute("MATCH (n:Person) RETURN n.name, n.age")
4445                .unwrap();
4446
4447            // Should return 2 rows with name and age columns
4448            assert_eq!(result.row_count(), 2);
4449            assert_eq!(result.column_count(), 2);
4450            assert_eq!(result.columns[0], "n.name");
4451            assert_eq!(result.columns[1], "n.age");
4452
4453            // Check that we get actual values
4454            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4455            assert!(names.contains(&&Value::String("Alix".into())));
4456            assert!(names.contains(&&Value::String("Gus".into())));
4457        }
4458
4459        #[test]
4460        fn test_gql_return_mixed_expressions() {
4461            use grafeo_common::types::Value;
4462
4463            let db = GrafeoDB::new_in_memory();
4464            let session = db.session();
4465
4466            // Create a person
4467            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4468
4469            // Query returning both node and property
4470            let result = session
4471                .execute("MATCH (n:Person) RETURN n, n.name")
4472                .unwrap();
4473
4474            assert_eq!(result.row_count(), 1);
4475            assert_eq!(result.column_count(), 2);
4476            assert_eq!(result.columns[0], "n");
4477            assert_eq!(result.columns[1], "n.name");
4478
4479            // Second column should be the name
4480            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4481        }
4482    }
4483
4484    #[cfg(feature = "cypher")]
4485    mod cypher_tests {
4486        use super::*;
4487
4488        #[test]
4489        fn test_cypher_query_execution() {
4490            let db = GrafeoDB::new_in_memory();
4491            let session = db.session();
4492
4493            // Create some test data
4494            session.create_node(&["Person"]);
4495            session.create_node(&["Person"]);
4496            session.create_node(&["Animal"]);
4497
4498            // Execute a Cypher query
4499            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4500
4501            // Should return 2 Person nodes
4502            assert_eq!(result.row_count(), 2);
4503            assert_eq!(result.column_count(), 1);
4504            assert_eq!(result.columns[0], "n");
4505        }
4506
4507        #[test]
4508        fn test_cypher_empty_result() {
4509            let db = GrafeoDB::new_in_memory();
4510            let session = db.session();
4511
4512            // No data in database
4513            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4514
4515            assert_eq!(result.row_count(), 0);
4516        }
4517
4518        #[test]
4519        fn test_cypher_parse_error() {
4520            let db = GrafeoDB::new_in_memory();
4521            let session = db.session();
4522
4523            // Invalid Cypher syntax
4524            let result = session.execute_cypher("MATCH (n RETURN n");
4525
4526            assert!(result.is_err());
4527        }
4528    }
4529
4530    // ==================== Direct Lookup API Tests ====================
4531
4532    mod direct_lookup_tests {
4533        use super::*;
4534        use grafeo_common::types::Value;
4535
4536        #[test]
4537        fn test_get_node() {
4538            let db = GrafeoDB::new_in_memory();
4539            let session = db.session();
4540
4541            let id = session.create_node(&["Person"]);
4542            let node = session.get_node(id);
4543
4544            assert!(node.is_some());
4545            let node = node.unwrap();
4546            assert_eq!(node.id, id);
4547        }
4548
4549        #[test]
4550        fn test_get_node_not_found() {
4551            use grafeo_common::types::NodeId;
4552
4553            let db = GrafeoDB::new_in_memory();
4554            let session = db.session();
4555
4556            // Try to get a non-existent node
4557            let node = session.get_node(NodeId::new(9999));
4558            assert!(node.is_none());
4559        }
4560
4561        #[test]
4562        fn test_get_node_property() {
4563            let db = GrafeoDB::new_in_memory();
4564            let session = db.session();
4565
4566            let id = session
4567                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4568
4569            let name = session.get_node_property(id, "name");
4570            assert_eq!(name, Some(Value::String("Alix".into())));
4571
4572            // Non-existent property
4573            let missing = session.get_node_property(id, "missing");
4574            assert!(missing.is_none());
4575        }
4576
4577        #[test]
4578        fn test_get_edge() {
4579            let db = GrafeoDB::new_in_memory();
4580            let session = db.session();
4581
4582            let alix = session.create_node(&["Person"]);
4583            let gus = session.create_node(&["Person"]);
4584            let edge_id = session.create_edge(alix, gus, "KNOWS");
4585
4586            let edge = session.get_edge(edge_id);
4587            assert!(edge.is_some());
4588            let edge = edge.unwrap();
4589            assert_eq!(edge.id, edge_id);
4590            assert_eq!(edge.src, alix);
4591            assert_eq!(edge.dst, gus);
4592        }
4593
4594        #[test]
4595        fn test_get_edge_not_found() {
4596            use grafeo_common::types::EdgeId;
4597
4598            let db = GrafeoDB::new_in_memory();
4599            let session = db.session();
4600
4601            let edge = session.get_edge(EdgeId::new(9999));
4602            assert!(edge.is_none());
4603        }
4604
4605        #[test]
4606        fn test_get_neighbors_outgoing() {
4607            let db = GrafeoDB::new_in_memory();
4608            let session = db.session();
4609
4610            let alix = session.create_node(&["Person"]);
4611            let gus = session.create_node(&["Person"]);
4612            let harm = session.create_node(&["Person"]);
4613
4614            session.create_edge(alix, gus, "KNOWS");
4615            session.create_edge(alix, harm, "KNOWS");
4616
4617            let neighbors = session.get_neighbors_outgoing(alix);
4618            assert_eq!(neighbors.len(), 2);
4619
4620            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4621            assert!(neighbor_ids.contains(&gus));
4622            assert!(neighbor_ids.contains(&harm));
4623        }
4624
4625        #[test]
4626        fn test_get_neighbors_incoming() {
4627            let db = GrafeoDB::new_in_memory();
4628            let session = db.session();
4629
4630            let alix = session.create_node(&["Person"]);
4631            let gus = session.create_node(&["Person"]);
4632            let harm = session.create_node(&["Person"]);
4633
4634            session.create_edge(gus, alix, "KNOWS");
4635            session.create_edge(harm, alix, "KNOWS");
4636
4637            let neighbors = session.get_neighbors_incoming(alix);
4638            assert_eq!(neighbors.len(), 2);
4639
4640            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4641            assert!(neighbor_ids.contains(&gus));
4642            assert!(neighbor_ids.contains(&harm));
4643        }
4644
4645        #[test]
4646        fn test_get_neighbors_outgoing_by_type() {
4647            let db = GrafeoDB::new_in_memory();
4648            let session = db.session();
4649
4650            let alix = session.create_node(&["Person"]);
4651            let gus = session.create_node(&["Person"]);
4652            let company = session.create_node(&["Company"]);
4653
4654            session.create_edge(alix, gus, "KNOWS");
4655            session.create_edge(alix, company, "WORKS_AT");
4656
4657            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4658            assert_eq!(knows_neighbors.len(), 1);
4659            assert_eq!(knows_neighbors[0].0, gus);
4660
4661            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4662            assert_eq!(works_neighbors.len(), 1);
4663            assert_eq!(works_neighbors[0].0, company);
4664
4665            // No edges of this type
4666            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4667            assert!(no_neighbors.is_empty());
4668        }
4669
4670        #[test]
4671        fn test_node_exists() {
4672            use grafeo_common::types::NodeId;
4673
4674            let db = GrafeoDB::new_in_memory();
4675            let session = db.session();
4676
4677            let id = session.create_node(&["Person"]);
4678
4679            assert!(session.node_exists(id));
4680            assert!(!session.node_exists(NodeId::new(9999)));
4681        }
4682
4683        #[test]
4684        fn test_edge_exists() {
4685            use grafeo_common::types::EdgeId;
4686
4687            let db = GrafeoDB::new_in_memory();
4688            let session = db.session();
4689
4690            let alix = session.create_node(&["Person"]);
4691            let gus = session.create_node(&["Person"]);
4692            let edge_id = session.create_edge(alix, gus, "KNOWS");
4693
4694            assert!(session.edge_exists(edge_id));
4695            assert!(!session.edge_exists(EdgeId::new(9999)));
4696        }
4697
4698        #[test]
4699        fn test_get_degree() {
4700            let db = GrafeoDB::new_in_memory();
4701            let session = db.session();
4702
4703            let alix = session.create_node(&["Person"]);
4704            let gus = session.create_node(&["Person"]);
4705            let harm = session.create_node(&["Person"]);
4706
4707            // Alix knows Gus and Harm (2 outgoing)
4708            session.create_edge(alix, gus, "KNOWS");
4709            session.create_edge(alix, harm, "KNOWS");
4710            // Gus knows Alix (1 incoming for Alix)
4711            session.create_edge(gus, alix, "KNOWS");
4712
4713            let (out_degree, in_degree) = session.get_degree(alix);
4714            assert_eq!(out_degree, 2);
4715            assert_eq!(in_degree, 1);
4716
4717            // Node with no edges
4718            let lonely = session.create_node(&["Person"]);
4719            let (out, in_deg) = session.get_degree(lonely);
4720            assert_eq!(out, 0);
4721            assert_eq!(in_deg, 0);
4722        }
4723
4724        #[test]
4725        fn test_get_nodes_batch() {
4726            let db = GrafeoDB::new_in_memory();
4727            let session = db.session();
4728
4729            let alix = session.create_node(&["Person"]);
4730            let gus = session.create_node(&["Person"]);
4731            let harm = session.create_node(&["Person"]);
4732
4733            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4734            assert_eq!(nodes.len(), 3);
4735            assert!(nodes[0].is_some());
4736            assert!(nodes[1].is_some());
4737            assert!(nodes[2].is_some());
4738
4739            // With non-existent node
4740            use grafeo_common::types::NodeId;
4741            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4742            assert_eq!(nodes_with_missing.len(), 3);
4743            assert!(nodes_with_missing[0].is_some());
4744            assert!(nodes_with_missing[1].is_none()); // Missing node
4745            assert!(nodes_with_missing[2].is_some());
4746        }
4747
4748        #[test]
4749        fn test_auto_commit_setting() {
4750            let db = GrafeoDB::new_in_memory();
4751            let mut session = db.session();
4752
4753            // Default is auto-commit enabled
4754            assert!(session.auto_commit());
4755
4756            session.set_auto_commit(false);
4757            assert!(!session.auto_commit());
4758
4759            session.set_auto_commit(true);
4760            assert!(session.auto_commit());
4761        }
4762
4763        #[test]
4764        fn test_transaction_double_begin_nests() {
4765            let db = GrafeoDB::new_in_memory();
4766            let mut session = db.session();
4767
4768            session.begin_transaction().unwrap();
4769            // Second begin_transaction creates a nested transaction (auto-savepoint)
4770            let result = session.begin_transaction();
4771            assert!(result.is_ok());
4772            // Commit the inner (releases savepoint)
4773            session.commit().unwrap();
4774            // Commit the outer
4775            session.commit().unwrap();
4776        }
4777
4778        #[test]
4779        fn test_commit_without_transaction_error() {
4780            let db = GrafeoDB::new_in_memory();
4781            let mut session = db.session();
4782
4783            let result = session.commit();
4784            assert!(result.is_err());
4785        }
4786
4787        #[test]
4788        fn test_rollback_without_transaction_error() {
4789            let db = GrafeoDB::new_in_memory();
4790            let mut session = db.session();
4791
4792            let result = session.rollback();
4793            assert!(result.is_err());
4794        }
4795
4796        #[test]
4797        fn test_create_edge_in_transaction() {
4798            let db = GrafeoDB::new_in_memory();
4799            let mut session = db.session();
4800
4801            // Create nodes outside transaction
4802            let alix = session.create_node(&["Person"]);
4803            let gus = session.create_node(&["Person"]);
4804
4805            // Create edge in transaction
4806            session.begin_transaction().unwrap();
4807            let edge_id = session.create_edge(alix, gus, "KNOWS");
4808
4809            // Edge should be visible in the transaction
4810            assert!(session.edge_exists(edge_id));
4811
4812            // Commit
4813            session.commit().unwrap();
4814
4815            // Edge should still be visible
4816            assert!(session.edge_exists(edge_id));
4817        }
4818
4819        #[test]
4820        fn test_neighbors_empty_node() {
4821            let db = GrafeoDB::new_in_memory();
4822            let session = db.session();
4823
4824            let lonely = session.create_node(&["Person"]);
4825
4826            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4827            assert!(session.get_neighbors_incoming(lonely).is_empty());
4828            assert!(
4829                session
4830                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4831                    .is_empty()
4832            );
4833        }
4834    }
4835
4836    #[test]
4837    fn test_auto_gc_triggers_on_commit_interval() {
4838        use crate::config::Config;
4839
4840        let config = Config::in_memory().with_gc_interval(2);
4841        let db = GrafeoDB::with_config(config).unwrap();
4842        let mut session = db.session();
4843
4844        // First commit: counter = 1, no GC (not a multiple of 2)
4845        session.begin_transaction().unwrap();
4846        session.create_node(&["A"]);
4847        session.commit().unwrap();
4848
4849        // Second commit: counter = 2, GC should trigger (multiple of 2)
4850        session.begin_transaction().unwrap();
4851        session.create_node(&["B"]);
4852        session.commit().unwrap();
4853
4854        // Verify the database is still functional after GC
4855        assert_eq!(db.node_count(), 2);
4856    }
4857
4858    #[test]
4859    fn test_query_timeout_config_propagates_to_session() {
4860        use crate::config::Config;
4861        use std::time::Duration;
4862
4863        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4864        let db = GrafeoDB::with_config(config).unwrap();
4865        let session = db.session();
4866
4867        // Verify the session has a query deadline (timeout was set)
4868        assert!(session.query_deadline().is_some());
4869    }
4870
4871    #[test]
4872    fn test_no_query_timeout_returns_no_deadline() {
4873        let db = GrafeoDB::new_in_memory();
4874        let session = db.session();
4875
4876        // Default config has no timeout
4877        assert!(session.query_deadline().is_none());
4878    }
4879
4880    #[test]
4881    fn test_graph_model_accessor() {
4882        use crate::config::GraphModel;
4883
4884        let db = GrafeoDB::new_in_memory();
4885        let session = db.session();
4886
4887        assert_eq!(session.graph_model(), GraphModel::Lpg);
4888    }
4889
4890    #[cfg(feature = "gql")]
4891    #[test]
4892    fn test_external_store_session() {
4893        use grafeo_core::graph::GraphStoreMut;
4894        use std::sync::Arc;
4895
4896        let config = crate::config::Config::in_memory();
4897        let store =
4898            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4899        let db = GrafeoDB::with_store(store, config).unwrap();
4900
4901        let mut session = db.session();
4902
4903        // Use an explicit transaction so that INSERT and MATCH share the same
4904        // transaction context. With PENDING epochs, uncommitted versions are
4905        // only visible to the owning transaction.
4906        session.begin_transaction().unwrap();
4907        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4908
4909        // Verify we can query through it within the same transaction
4910        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4911        assert_eq!(result.row_count(), 1);
4912
4913        session.commit().unwrap();
4914    }
4915
4916    // ==================== Session Command Tests ====================
4917
4918    #[cfg(feature = "gql")]
4919    mod session_command_tests {
4920        use super::*;
4921        use grafeo_common::types::Value;
4922
4923        #[test]
4924        fn test_use_graph_sets_current_graph() {
4925            let db = GrafeoDB::new_in_memory();
4926            let session = db.session();
4927
4928            // Create the graph first, then USE it
4929            session.execute("CREATE GRAPH mydb").unwrap();
4930            session.execute("USE GRAPH mydb").unwrap();
4931
4932            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4933        }
4934
4935        #[test]
4936        fn test_use_graph_nonexistent_errors() {
4937            let db = GrafeoDB::new_in_memory();
4938            let session = db.session();
4939
4940            let result = session.execute("USE GRAPH doesnotexist");
4941            assert!(result.is_err());
4942            let err = result.unwrap_err().to_string();
4943            assert!(
4944                err.contains("does not exist"),
4945                "Expected 'does not exist' error, got: {err}"
4946            );
4947        }
4948
4949        #[test]
4950        fn test_use_graph_default_always_valid() {
4951            let db = GrafeoDB::new_in_memory();
4952            let session = db.session();
4953
4954            // "default" is always valid, even without CREATE GRAPH
4955            session.execute("USE GRAPH default").unwrap();
4956            assert_eq!(session.current_graph(), Some("default".to_string()));
4957        }
4958
4959        #[test]
4960        fn test_session_set_graph() {
4961            let db = GrafeoDB::new_in_memory();
4962            let session = db.session();
4963
4964            session.execute("CREATE GRAPH analytics").unwrap();
4965            session.execute("SESSION SET GRAPH analytics").unwrap();
4966            assert_eq!(session.current_graph(), Some("analytics".to_string()));
4967        }
4968
4969        #[test]
4970        fn test_session_set_graph_nonexistent_errors() {
4971            let db = GrafeoDB::new_in_memory();
4972            let session = db.session();
4973
4974            let result = session.execute("SESSION SET GRAPH nosuchgraph");
4975            assert!(result.is_err());
4976        }
4977
4978        #[test]
4979        fn test_session_set_time_zone() {
4980            let db = GrafeoDB::new_in_memory();
4981            let session = db.session();
4982
4983            assert_eq!(session.time_zone(), None);
4984
4985            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4986            assert_eq!(session.time_zone(), Some("UTC".to_string()));
4987
4988            session
4989                .execute("SESSION SET TIME ZONE 'America/New_York'")
4990                .unwrap();
4991            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4992        }
4993
4994        #[test]
4995        fn test_session_set_parameter() {
4996            let db = GrafeoDB::new_in_memory();
4997            let session = db.session();
4998
4999            session
5000                .execute("SESSION SET PARAMETER $timeout = 30")
5001                .unwrap();
5002
5003            // Parameter is stored (value is Null for now, since expression
5004            // evaluation is not yet wired up)
5005            assert!(session.get_parameter("timeout").is_some());
5006        }
5007
5008        #[test]
5009        fn test_session_reset_clears_all_state() {
5010            let db = GrafeoDB::new_in_memory();
5011            let session = db.session();
5012
5013            // Set various session state
5014            session.execute("CREATE GRAPH analytics").unwrap();
5015            session.execute("SESSION SET GRAPH analytics").unwrap();
5016            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5017            session
5018                .execute("SESSION SET PARAMETER $limit = 100")
5019                .unwrap();
5020
5021            // Verify state was set
5022            assert!(session.current_graph().is_some());
5023            assert!(session.time_zone().is_some());
5024            assert!(session.get_parameter("limit").is_some());
5025
5026            // Reset everything
5027            session.execute("SESSION RESET").unwrap();
5028
5029            assert_eq!(session.current_graph(), None);
5030            assert_eq!(session.time_zone(), None);
5031            assert!(session.get_parameter("limit").is_none());
5032        }
5033
5034        #[test]
5035        fn test_session_close_clears_state() {
5036            let db = GrafeoDB::new_in_memory();
5037            let session = db.session();
5038
5039            session.execute("CREATE GRAPH analytics").unwrap();
5040            session.execute("SESSION SET GRAPH analytics").unwrap();
5041            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5042
5043            session.execute("SESSION CLOSE").unwrap();
5044
5045            assert_eq!(session.current_graph(), None);
5046            assert_eq!(session.time_zone(), None);
5047        }
5048
5049        #[test]
5050        fn test_create_graph() {
5051            let db = GrafeoDB::new_in_memory();
5052            let session = db.session();
5053
5054            session.execute("CREATE GRAPH mydb").unwrap();
5055
5056            // Should be able to USE it now
5057            session.execute("USE GRAPH mydb").unwrap();
5058            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5059        }
5060
5061        #[test]
5062        fn test_create_graph_duplicate_errors() {
5063            let db = GrafeoDB::new_in_memory();
5064            let session = db.session();
5065
5066            session.execute("CREATE GRAPH mydb").unwrap();
5067            let result = session.execute("CREATE GRAPH mydb");
5068
5069            assert!(result.is_err());
5070            let err = result.unwrap_err().to_string();
5071            assert!(
5072                err.contains("already exists"),
5073                "Expected 'already exists' error, got: {err}"
5074            );
5075        }
5076
5077        #[test]
5078        fn test_create_graph_if_not_exists() {
5079            let db = GrafeoDB::new_in_memory();
5080            let session = db.session();
5081
5082            session.execute("CREATE GRAPH mydb").unwrap();
5083            // Should succeed silently with IF NOT EXISTS
5084            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5085        }
5086
5087        #[test]
5088        fn test_drop_graph() {
5089            let db = GrafeoDB::new_in_memory();
5090            let session = db.session();
5091
5092            session.execute("CREATE GRAPH mydb").unwrap();
5093            session.execute("DROP GRAPH mydb").unwrap();
5094
5095            // Should no longer be usable
5096            let result = session.execute("USE GRAPH mydb");
5097            assert!(result.is_err());
5098        }
5099
5100        #[test]
5101        fn test_drop_graph_nonexistent_errors() {
5102            let db = GrafeoDB::new_in_memory();
5103            let session = db.session();
5104
5105            let result = session.execute("DROP GRAPH nosuchgraph");
5106            assert!(result.is_err());
5107            let err = result.unwrap_err().to_string();
5108            assert!(
5109                err.contains("does not exist"),
5110                "Expected 'does not exist' error, got: {err}"
5111            );
5112        }
5113
5114        #[test]
5115        fn test_drop_graph_if_exists() {
5116            let db = GrafeoDB::new_in_memory();
5117            let session = db.session();
5118
5119            // Should succeed silently with IF EXISTS
5120            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5121        }
5122
5123        #[test]
5124        fn test_start_transaction_via_gql() {
5125            let db = GrafeoDB::new_in_memory();
5126            let session = db.session();
5127
5128            session.execute("START TRANSACTION").unwrap();
5129            assert!(session.in_transaction());
5130            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5131            session.execute("COMMIT").unwrap();
5132            assert!(!session.in_transaction());
5133
5134            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5135            assert_eq!(result.rows.len(), 1);
5136        }
5137
5138        #[test]
5139        fn test_start_transaction_read_only_blocks_insert() {
5140            let db = GrafeoDB::new_in_memory();
5141            let session = db.session();
5142
5143            session.execute("START TRANSACTION READ ONLY").unwrap();
5144            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5145            assert!(result.is_err());
5146            let err = result.unwrap_err().to_string();
5147            assert!(
5148                err.contains("read-only"),
5149                "Expected read-only error, got: {err}"
5150            );
5151            session.execute("ROLLBACK").unwrap();
5152        }
5153
5154        #[test]
5155        fn test_start_transaction_read_only_allows_reads() {
5156            let db = GrafeoDB::new_in_memory();
5157            let mut session = db.session();
5158            session.begin_transaction().unwrap();
5159            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5160            session.commit().unwrap();
5161
5162            session.execute("START TRANSACTION READ ONLY").unwrap();
5163            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5164            assert_eq!(result.rows.len(), 1);
5165            session.execute("COMMIT").unwrap();
5166        }
5167
5168        #[test]
5169        fn test_rollback_via_gql() {
5170            let db = GrafeoDB::new_in_memory();
5171            let session = db.session();
5172
5173            session.execute("START TRANSACTION").unwrap();
5174            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5175            session.execute("ROLLBACK").unwrap();
5176
5177            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5178            assert!(result.rows.is_empty());
5179        }
5180
5181        #[test]
5182        fn test_start_transaction_with_isolation_level() {
5183            let db = GrafeoDB::new_in_memory();
5184            let session = db.session();
5185
5186            session
5187                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5188                .unwrap();
5189            assert!(session.in_transaction());
5190            session.execute("ROLLBACK").unwrap();
5191        }
5192
5193        #[test]
5194        fn test_session_commands_return_empty_result() {
5195            let db = GrafeoDB::new_in_memory();
5196            let session = db.session();
5197
5198            session.execute("CREATE GRAPH test").unwrap();
5199            let result = session.execute("SESSION SET GRAPH test").unwrap();
5200            assert_eq!(result.row_count(), 0);
5201            assert_eq!(result.column_count(), 0);
5202        }
5203
5204        #[test]
5205        fn test_current_graph_default_is_none() {
5206            let db = GrafeoDB::new_in_memory();
5207            let session = db.session();
5208
5209            assert_eq!(session.current_graph(), None);
5210        }
5211
5212        #[test]
5213        fn test_time_zone_default_is_none() {
5214            let db = GrafeoDB::new_in_memory();
5215            let session = db.session();
5216
5217            assert_eq!(session.time_zone(), None);
5218        }
5219
5220        #[test]
5221        fn test_session_state_independent_across_sessions() {
5222            let db = GrafeoDB::new_in_memory();
5223            let session1 = db.session();
5224            let session2 = db.session();
5225
5226            session1.execute("CREATE GRAPH first").unwrap();
5227            session1.execute("CREATE GRAPH second").unwrap();
5228            session1.execute("SESSION SET GRAPH first").unwrap();
5229            session2.execute("SESSION SET GRAPH second").unwrap();
5230
5231            assert_eq!(session1.current_graph(), Some("first".to_string()));
5232            assert_eq!(session2.current_graph(), Some("second".to_string()));
5233        }
5234
5235        #[test]
5236        fn test_show_node_types() {
5237            let db = GrafeoDB::new_in_memory();
5238            let session = db.session();
5239
5240            session
5241                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5242                .unwrap();
5243
5244            let result = session.execute("SHOW NODE TYPES").unwrap();
5245            assert_eq!(
5246                result.columns,
5247                vec!["name", "properties", "constraints", "parents"]
5248            );
5249            assert_eq!(result.rows.len(), 1);
5250            // First column is the type name
5251            assert_eq!(result.rows[0][0], Value::from("Person"));
5252        }
5253
5254        #[test]
5255        fn test_show_edge_types() {
5256            let db = GrafeoDB::new_in_memory();
5257            let session = db.session();
5258
5259            session
5260                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5261                .unwrap();
5262
5263            let result = session.execute("SHOW EDGE TYPES").unwrap();
5264            assert_eq!(
5265                result.columns,
5266                vec!["name", "properties", "source_types", "target_types"]
5267            );
5268            assert_eq!(result.rows.len(), 1);
5269            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5270        }
5271
5272        #[test]
5273        fn test_show_graph_types() {
5274            let db = GrafeoDB::new_in_memory();
5275            let session = db.session();
5276
5277            session
5278                .execute("CREATE NODE TYPE Person (name STRING)")
5279                .unwrap();
5280            session
5281                .execute(
5282                    "CREATE GRAPH TYPE social (\
5283                        NODE TYPE Person (name STRING)\
5284                    )",
5285                )
5286                .unwrap();
5287
5288            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5289            assert_eq!(
5290                result.columns,
5291                vec!["name", "open", "node_types", "edge_types"]
5292            );
5293            assert_eq!(result.rows.len(), 1);
5294            assert_eq!(result.rows[0][0], Value::from("social"));
5295        }
5296
5297        #[test]
5298        fn test_show_graph_type_named() {
5299            let db = GrafeoDB::new_in_memory();
5300            let session = db.session();
5301
5302            session
5303                .execute("CREATE NODE TYPE Person (name STRING)")
5304                .unwrap();
5305            session
5306                .execute(
5307                    "CREATE GRAPH TYPE social (\
5308                        NODE TYPE Person (name STRING)\
5309                    )",
5310                )
5311                .unwrap();
5312
5313            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5314            assert_eq!(result.rows.len(), 1);
5315            assert_eq!(result.rows[0][0], Value::from("social"));
5316        }
5317
5318        #[test]
5319        fn test_show_graph_type_not_found() {
5320            let db = GrafeoDB::new_in_memory();
5321            let session = db.session();
5322
5323            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5324            assert!(result.is_err());
5325        }
5326
5327        #[test]
5328        fn test_show_indexes_via_gql() {
5329            let db = GrafeoDB::new_in_memory();
5330            let session = db.session();
5331
5332            let result = session.execute("SHOW INDEXES").unwrap();
5333            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5334        }
5335
5336        #[test]
5337        fn test_show_constraints_via_gql() {
5338            let db = GrafeoDB::new_in_memory();
5339            let session = db.session();
5340
5341            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5342            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5343        }
5344
5345        #[test]
5346        fn test_pattern_form_graph_type_roundtrip() {
5347            let db = GrafeoDB::new_in_memory();
5348            let session = db.session();
5349
5350            // Register the types first
5351            session
5352                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5353                .unwrap();
5354            session
5355                .execute("CREATE NODE TYPE City (name STRING)")
5356                .unwrap();
5357            session
5358                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5359                .unwrap();
5360            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5361
5362            // Create graph type using pattern form
5363            session
5364                .execute(
5365                    "CREATE GRAPH TYPE social (\
5366                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5367                        (:Person)-[:LIVES_IN]->(:City)\
5368                    )",
5369                )
5370                .unwrap();
5371
5372            // Verify it was created
5373            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5374            assert_eq!(result.rows.len(), 1);
5375            assert_eq!(result.rows[0][0], Value::from("social"));
5376        }
5377    }
5378}