Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
19#[cfg(feature = "rdf")]
20use grafeo_core::graph::rdf::RdfStore;
21use grafeo_core::graph::{GraphStore, GraphStoreMut};
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29/// Parses a DDL default-value literal string into a [`Value`].
30///
31/// Handles string literals (single- or double-quoted), integers, floats,
32/// booleans (`true`/`false`), and `NULL`.
33fn parse_default_literal(text: &str) -> Value {
34    if text.eq_ignore_ascii_case("null") {
35        return Value::Null;
36    }
37    if text.eq_ignore_ascii_case("true") {
38        return Value::Bool(true);
39    }
40    if text.eq_ignore_ascii_case("false") {
41        return Value::Bool(false);
42    }
43    // String literal: strip surrounding quotes
44    if (text.starts_with('\'') && text.ends_with('\''))
45        || (text.starts_with('"') && text.ends_with('"'))
46    {
47        return Value::String(text[1..text.len() - 1].into());
48    }
49    // Try integer, then float
50    if let Ok(i) = text.parse::<i64>() {
51        return Value::Int64(i);
52    }
53    if let Ok(f) = text.parse::<f64>() {
54        return Value::Float64(f);
55    }
56    // Fallback: treat as string
57    Value::String(text.into())
58}
59
60/// Runtime configuration for creating a new session.
61///
62/// Groups the shared parameters passed to all session constructors, keeping
63/// call sites readable and avoiding long argument lists.
64pub(crate) struct SessionConfig {
65    pub transaction_manager: Arc<TransactionManager>,
66    pub query_cache: Arc<QueryCache>,
67    pub catalog: Arc<Catalog>,
68    pub adaptive_config: AdaptiveConfig,
69    pub factorized_execution: bool,
70    pub graph_model: GraphModel,
71    pub query_timeout: Option<Duration>,
72    pub commit_counter: Arc<AtomicUsize>,
73    pub gc_interval: usize,
74    /// When true, the session permanently blocks all mutations.
75    pub read_only: bool,
76}
77
78/// Your handle to the database - execute queries and manage transactions.
79///
80/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
81/// tracks its own transaction state, so you can have multiple concurrent
82/// sessions without them interfering.
83pub struct Session {
84    /// The underlying store.
85    store: Arc<LpgStore>,
86    /// Graph store trait object for pluggable storage backends (read path).
87    graph_store: Arc<dyn GraphStore>,
88    /// Writable graph store (None for read-only databases).
89    graph_store_mut: Option<Arc<dyn GraphStoreMut>>,
90    /// Schema and metadata catalog shared across sessions.
91    catalog: Arc<Catalog>,
92    /// RDF triple store (if RDF feature is enabled).
93    #[cfg(feature = "rdf")]
94    rdf_store: Arc<RdfStore>,
95    /// Transaction manager.
96    transaction_manager: Arc<TransactionManager>,
97    /// Query cache shared across sessions.
98    query_cache: Arc<QueryCache>,
99    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
100    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
101    /// from within `execute(&self)`.
102    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
103    /// Whether the current transaction is read-only (blocks mutations).
104    read_only_tx: parking_lot::Mutex<bool>,
105    /// Whether the database itself is read-only (set at open time, never changes).
106    /// When true, `read_only_tx` is always true regardless of transaction flags.
107    db_read_only: bool,
108    /// Whether the session is in auto-commit mode.
109    auto_commit: bool,
110    /// Adaptive execution configuration.
111    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
112    adaptive_config: AdaptiveConfig,
113    /// Whether to use factorized execution for multi-hop queries.
114    factorized_execution: bool,
115    /// The graph data model this session operates on.
116    graph_model: GraphModel,
117    /// Maximum time a query may run before being cancelled.
118    query_timeout: Option<Duration>,
119    /// Shared commit counter for triggering auto-GC.
120    commit_counter: Arc<AtomicUsize>,
121    /// GC every N commits (0 = disabled).
122    gc_interval: usize,
123    /// Node count at the start of the current transaction (for PreparedCommit stats).
124    transaction_start_node_count: AtomicUsize,
125    /// Edge count at the start of the current transaction (for PreparedCommit stats).
126    transaction_start_edge_count: AtomicUsize,
127    /// WAL for logging schema changes.
128    #[cfg(feature = "wal")]
129    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
130    /// Shared WAL graph context tracker for named graph awareness.
131    #[cfg(feature = "wal")]
132    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
133    /// CDC log for change tracking.
134    #[cfg(feature = "cdc")]
135    cdc_log: Arc<crate::cdc::CdcLog>,
136    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
137    current_graph: parking_lot::Mutex<Option<String>>,
138    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
139    /// None = "not set" (uses default schema).
140    current_schema: parking_lot::Mutex<Option<String>>,
141    /// Session time zone override.
142    time_zone: parking_lot::Mutex<Option<String>>,
143    /// Session-level parameters (SET PARAMETER).
144    session_params:
145        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
146    /// Override epoch for time-travel queries (None = use transaction/current epoch).
147    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
148    /// Savepoints within the current transaction.
149    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
150    /// Nesting depth for nested transactions (0 = outermost).
151    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
152    /// releases it, nested `ROLLBACK` rolls back to it.
153    transaction_nesting_depth: parking_lot::Mutex<u32>,
154    /// Named graphs touched during the current transaction (for cross-graph atomicity).
155    /// `None` represents the default graph. Populated at `BEGIN` time and on each
156    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
157    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
158    /// Shared metrics registry (populated when the `metrics` feature is enabled).
159    #[cfg(feature = "metrics")]
160    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
161    /// Transaction start time for duration tracking.
162    #[cfg(feature = "metrics")]
163    tx_start_time: parking_lot::Mutex<Option<Instant>>,
164}
165
166/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
167#[derive(Clone)]
168struct GraphSavepoint {
169    graph_name: Option<String>,
170    next_node_id: u64,
171    next_edge_id: u64,
172    undo_log_position: usize,
173}
174
175/// Savepoint state: name + per-graph snapshots + the graph that was active.
176#[derive(Clone)]
177struct SavepointState {
178    name: String,
179    graph_snapshots: Vec<GraphSavepoint>,
180    /// The graph that was active when the savepoint was created.
181    /// Reserved for future use (e.g., restoring graph context on rollback).
182    #[allow(dead_code)]
183    active_graph: Option<String>,
184}
185
186impl Session {
187    /// Creates a new session with adaptive execution configuration.
188    #[allow(dead_code)]
189    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
190        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
191        let graph_store_mut = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
192        Self {
193            store,
194            graph_store,
195            graph_store_mut,
196            catalog: cfg.catalog,
197            #[cfg(feature = "rdf")]
198            rdf_store: Arc::new(RdfStore::new()),
199            transaction_manager: cfg.transaction_manager,
200            query_cache: cfg.query_cache,
201            current_transaction: parking_lot::Mutex::new(None),
202            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
203            db_read_only: cfg.read_only,
204            auto_commit: true,
205            adaptive_config: cfg.adaptive_config,
206            factorized_execution: cfg.factorized_execution,
207            graph_model: cfg.graph_model,
208            query_timeout: cfg.query_timeout,
209            commit_counter: cfg.commit_counter,
210            gc_interval: cfg.gc_interval,
211            transaction_start_node_count: AtomicUsize::new(0),
212            transaction_start_edge_count: AtomicUsize::new(0),
213            #[cfg(feature = "wal")]
214            wal: None,
215            #[cfg(feature = "wal")]
216            wal_graph_context: None,
217            #[cfg(feature = "cdc")]
218            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
219            current_graph: parking_lot::Mutex::new(None),
220            current_schema: parking_lot::Mutex::new(None),
221            time_zone: parking_lot::Mutex::new(None),
222            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
223            viewing_epoch_override: parking_lot::Mutex::new(None),
224            savepoints: parking_lot::Mutex::new(Vec::new()),
225            transaction_nesting_depth: parking_lot::Mutex::new(0),
226            touched_graphs: parking_lot::Mutex::new(Vec::new()),
227            #[cfg(feature = "metrics")]
228            metrics: None,
229            #[cfg(feature = "metrics")]
230            tx_start_time: parking_lot::Mutex::new(None),
231        }
232    }
233
234    /// Sets the WAL for this session (shared with the database).
235    ///
236    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
237    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
238    #[cfg(feature = "wal")]
239    pub(crate) fn set_wal(
240        &mut self,
241        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
242        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
243    ) {
244        // Wrap the graph store so query-engine mutations are WAL-logged
245        let wal_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
246            Arc::clone(&self.store),
247            Arc::clone(&wal),
248            Arc::clone(&wal_graph_context),
249        ));
250        self.graph_store = Arc::clone(&wal_store) as Arc<dyn GraphStore>;
251        self.graph_store_mut = Some(wal_store as Arc<dyn GraphStoreMut>);
252        self.wal = Some(wal);
253        self.wal_graph_context = Some(wal_graph_context);
254    }
255
256    /// Sets the CDC log for this session (shared with the database).
257    #[cfg(feature = "cdc")]
258    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
259        self.cdc_log = cdc_log;
260    }
261
262    /// Sets the metrics registry for this session (shared with the database).
263    #[cfg(feature = "metrics")]
264    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
265        self.metrics = Some(metrics);
266    }
267
268    /// Creates a session backed by an external graph store.
269    ///
270    /// The external store handles all data operations. Transaction management
271    /// (begin/commit/rollback) is not supported for external stores.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the internal arena allocation fails (out of memory).
276    pub(crate) fn with_external_store(
277        read_store: Arc<dyn GraphStore>,
278        write_store: Option<Arc<dyn GraphStoreMut>>,
279        cfg: SessionConfig,
280    ) -> Result<Self> {
281        Ok(Self {
282            store: Arc::new(LpgStore::new()?),
283            graph_store: read_store,
284            graph_store_mut: write_store,
285            catalog: cfg.catalog,
286            #[cfg(feature = "rdf")]
287            rdf_store: Arc::new(RdfStore::new()),
288            transaction_manager: cfg.transaction_manager,
289            query_cache: cfg.query_cache,
290            current_transaction: parking_lot::Mutex::new(None),
291            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
292            db_read_only: cfg.read_only,
293            auto_commit: true,
294            adaptive_config: cfg.adaptive_config,
295            factorized_execution: cfg.factorized_execution,
296            graph_model: cfg.graph_model,
297            query_timeout: cfg.query_timeout,
298            commit_counter: cfg.commit_counter,
299            gc_interval: cfg.gc_interval,
300            transaction_start_node_count: AtomicUsize::new(0),
301            transaction_start_edge_count: AtomicUsize::new(0),
302            #[cfg(feature = "wal")]
303            wal: None,
304            #[cfg(feature = "wal")]
305            wal_graph_context: None,
306            #[cfg(feature = "cdc")]
307            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
308            current_graph: parking_lot::Mutex::new(None),
309            current_schema: parking_lot::Mutex::new(None),
310            time_zone: parking_lot::Mutex::new(None),
311            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
312            viewing_epoch_override: parking_lot::Mutex::new(None),
313            savepoints: parking_lot::Mutex::new(Vec::new()),
314            transaction_nesting_depth: parking_lot::Mutex::new(0),
315            touched_graphs: parking_lot::Mutex::new(Vec::new()),
316            #[cfg(feature = "metrics")]
317            metrics: None,
318            #[cfg(feature = "metrics")]
319            tx_start_time: parking_lot::Mutex::new(None),
320        })
321    }
322
323    /// Returns the graph model this session operates on.
324    #[must_use]
325    pub fn graph_model(&self) -> GraphModel {
326        self.graph_model
327    }
328
329    // === Session State Management ===
330
331    /// Sets the current graph for this session (USE GRAPH).
332    pub fn use_graph(&self, name: &str) {
333        *self.current_graph.lock() = Some(name.to_string());
334    }
335
336    /// Returns the current graph name, if any.
337    #[must_use]
338    pub fn current_graph(&self) -> Option<String> {
339        self.current_graph.lock().clone()
340    }
341
342    /// Sets the current schema for this session (SESSION SET SCHEMA).
343    ///
344    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
345    pub fn set_schema(&self, name: &str) {
346        *self.current_schema.lock() = Some(name.to_string());
347    }
348
349    /// Returns the current schema name, if any.
350    ///
351    /// `None` means "not set", which resolves to the default schema.
352    #[must_use]
353    pub fn current_schema(&self) -> Option<String> {
354        self.current_schema.lock().clone()
355    }
356
357    /// Computes the effective storage key for a graph, accounting for schema context.
358    ///
359    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
360    /// Uses `/` as separator since it is invalid in GQL identifiers.
361    fn effective_graph_key(&self, graph_name: &str) -> String {
362        let schema = self.current_schema.lock().clone();
363        match schema {
364            Some(s) => format!("{s}/{graph_name}"),
365            None => graph_name.to_string(),
366        }
367    }
368
369    /// Computes the effective storage key for a type, accounting for schema context.
370    ///
371    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
372    fn effective_type_key(&self, type_name: &str) -> String {
373        let schema = self.current_schema.lock().clone();
374        match schema {
375            Some(s) => format!("{s}/{type_name}"),
376            None => type_name.to_string(),
377        }
378    }
379
380    /// Returns the effective storage key for the current graph, accounting for schema.
381    ///
382    /// Combines `current_schema` and `current_graph` into a flat lookup key.
383    fn active_graph_storage_key(&self) -> Option<String> {
384        let graph = self.current_graph.lock().clone();
385        let schema = self.current_schema.lock().clone();
386        match (schema, graph) {
387            (_, None) => None,
388            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
389            (None, Some(name)) => Some(name),
390            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
391        }
392    }
393
394    /// Returns the graph store for the currently active graph.
395    ///
396    /// If `current_graph` is `None` or `"default"`, returns the session's
397    /// default `graph_store` (already WAL-wrapped for the default graph).
398    /// Otherwise looks up the named graph in the root store and wraps it
399    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
400    /// graph context.
401    fn active_store(&self) -> Arc<dyn GraphStore> {
402        let key = self.active_graph_storage_key();
403        match key {
404            None => Arc::clone(&self.graph_store),
405            Some(ref name) => match self.store.graph(name) {
406                Some(named_store) => {
407                    #[cfg(feature = "wal")]
408                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
409                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
410                            named_store,
411                            Arc::clone(wal),
412                            name.clone(),
413                            Arc::clone(ctx),
414                        )) as Arc<dyn GraphStore>;
415                    }
416                    named_store as Arc<dyn GraphStore>
417                }
418                None => Arc::clone(&self.graph_store),
419            },
420        }
421    }
422
423    /// Returns the writable store for the active graph, if available.
424    ///
425    /// Returns `None` for read-only databases. For named graphs, wraps
426    /// the store with WAL logging when durability is enabled.
427    fn active_write_store(&self) -> Option<Arc<dyn GraphStoreMut>> {
428        let key = self.active_graph_storage_key();
429        match key {
430            None => self.graph_store_mut.as_ref().map(Arc::clone),
431            Some(ref name) => match self.store.graph(name) {
432                Some(named_store) => {
433                    #[cfg(feature = "wal")]
434                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
435                        return Some(Arc::new(
436                            crate::database::wal_store::WalGraphStore::new_for_graph(
437                                named_store,
438                                Arc::clone(wal),
439                                name.clone(),
440                                Arc::clone(ctx),
441                            ),
442                        ) as Arc<dyn GraphStoreMut>);
443                    }
444                    Some(named_store as Arc<dyn GraphStoreMut>)
445                }
446                None => self.graph_store_mut.as_ref().map(Arc::clone),
447            },
448        }
449    }
450
451    /// Returns the concrete `LpgStore` for the currently active graph.
452    ///
453    /// Used by direct CRUD methods that need the concrete store type
454    /// for versioned operations.
455    fn active_lpg_store(&self) -> Arc<LpgStore> {
456        let key = self.active_graph_storage_key();
457        match key {
458            None => Arc::clone(&self.store),
459            Some(ref name) => self
460                .store
461                .graph(name)
462                .unwrap_or_else(|| Arc::clone(&self.store)),
463        }
464    }
465
466    /// Resolves a graph name to a concrete `LpgStore`.
467    /// `None` and `"default"` resolve to the session's root store.
468    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
469        match graph_name {
470            None => Arc::clone(&self.store),
471            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
472            Some(name) => self
473                .store
474                .graph(name)
475                .unwrap_or_else(|| Arc::clone(&self.store)),
476        }
477    }
478
479    /// Records the current graph as "touched" if a transaction is active.
480    ///
481    /// Uses the full storage key (schema/graph) so that commit/rollback
482    /// can resolve the correct store via `resolve_store`.
483    fn track_graph_touch(&self) {
484        if self.current_transaction.lock().is_some() {
485            let key = self.active_graph_storage_key();
486            let mut touched = self.touched_graphs.lock();
487            if !touched.contains(&key) {
488                touched.push(key);
489            }
490        }
491    }
492
493    /// Sets the session time zone.
494    pub fn set_time_zone(&self, tz: &str) {
495        *self.time_zone.lock() = Some(tz.to_string());
496    }
497
498    /// Returns the session time zone, if set.
499    #[must_use]
500    pub fn time_zone(&self) -> Option<String> {
501        self.time_zone.lock().clone()
502    }
503
504    /// Sets a session parameter.
505    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
506        self.session_params.lock().insert(key.to_string(), value);
507    }
508
509    /// Gets a session parameter by cloning it.
510    #[must_use]
511    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
512        self.session_params.lock().get(key).cloned()
513    }
514
515    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
516    pub fn reset_session(&self) {
517        *self.current_schema.lock() = None;
518        *self.current_graph.lock() = None;
519        *self.time_zone.lock() = None;
520        self.session_params.lock().clear();
521        *self.viewing_epoch_override.lock() = None;
522    }
523
524    /// Resets only the session schema (Section 7.2 GR1).
525    pub fn reset_schema(&self) {
526        *self.current_schema.lock() = None;
527    }
528
529    /// Resets only the session graph (Section 7.2 GR2).
530    pub fn reset_graph(&self) {
531        *self.current_graph.lock() = None;
532    }
533
534    /// Resets only the session time zone (Section 7.2 GR3).
535    pub fn reset_time_zone(&self) {
536        *self.time_zone.lock() = None;
537    }
538
539    /// Resets only session parameters (Section 7.2 GR4).
540    pub fn reset_parameters(&self) {
541        self.session_params.lock().clear();
542    }
543
544    // --- Time-travel API ---
545
546    /// Sets a viewing epoch override for time-travel queries.
547    ///
548    /// While set, all queries on this session see the database as it existed
549    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
550    /// to return to normal behavior.
551    pub fn set_viewing_epoch(&self, epoch: EpochId) {
552        *self.viewing_epoch_override.lock() = Some(epoch);
553    }
554
555    /// Clears the viewing epoch override, returning to normal behavior.
556    pub fn clear_viewing_epoch(&self) {
557        *self.viewing_epoch_override.lock() = None;
558    }
559
560    /// Returns the current viewing epoch override, if any.
561    #[must_use]
562    pub fn viewing_epoch(&self) -> Option<EpochId> {
563        *self.viewing_epoch_override.lock()
564    }
565
566    /// Returns all versions of a node with their creation/deletion epochs.
567    ///
568    /// Properties and labels reflect the current state (not versioned per-epoch).
569    #[must_use]
570    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
571        self.active_lpg_store().get_node_history(id)
572    }
573
574    /// Returns all versions of an edge with their creation/deletion epochs.
575    ///
576    /// Properties reflect the current state (not versioned per-epoch).
577    #[must_use]
578    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
579        self.active_lpg_store().get_edge_history(id)
580    }
581
582    /// Checks that the session's graph model supports LPG operations.
583    fn require_lpg(&self, language: &str) -> Result<()> {
584        if self.graph_model == GraphModel::Rdf {
585            return Err(grafeo_common::utils::error::Error::Internal(format!(
586                "This is an RDF database. {language} queries require an LPG database."
587            )));
588        }
589        Ok(())
590    }
591
592    /// Executes a session or transaction command, returning an empty result.
593    #[cfg(feature = "gql")]
594    fn execute_session_command(
595        &self,
596        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
597    ) -> Result<QueryResult> {
598        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
599        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
600
601        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
602        if *self.read_only_tx.lock() {
603            match &cmd {
604                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
605                    return Err(Error::Transaction(
606                        grafeo_common::utils::error::TransactionError::ReadOnly,
607                    ));
608                }
609                _ => {} // Session state + transaction control allowed
610            }
611        }
612
613        match cmd {
614            SessionCommand::CreateGraph {
615                name,
616                if_not_exists,
617                typed,
618                like_graph,
619                copy_of,
620                open: _,
621            } => {
622                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
623                let storage_key = self.effective_graph_key(&name);
624
625                // Validate source graph exists for LIKE / AS COPY OF
626                if let Some(ref src) = like_graph {
627                    let src_key = self.effective_graph_key(src);
628                    if self.store.graph(&src_key).is_none() {
629                        return Err(Error::Query(QueryError::new(
630                            QueryErrorKind::Semantic,
631                            format!("Source graph '{src}' does not exist"),
632                        )));
633                    }
634                }
635                if let Some(ref src) = copy_of {
636                    let src_key = self.effective_graph_key(src);
637                    if self.store.graph(&src_key).is_none() {
638                        return Err(Error::Query(QueryError::new(
639                            QueryErrorKind::Semantic,
640                            format!("Source graph '{src}' does not exist"),
641                        )));
642                    }
643                }
644
645                let created = self
646                    .store
647                    .create_graph(&storage_key)
648                    .map_err(|e| Error::Internal(e.to_string()))?;
649                if !created && !if_not_exists {
650                    return Err(Error::Query(QueryError::new(
651                        QueryErrorKind::Semantic,
652                        format!("Graph '{name}' already exists"),
653                    )));
654                }
655                if created {
656                    #[cfg(feature = "wal")]
657                    self.log_schema_wal(
658                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
659                            name: storage_key.clone(),
660                        },
661                    );
662                }
663
664                // AS COPY OF: copy data from source graph
665                if let Some(ref src) = copy_of {
666                    let src_key = self.effective_graph_key(src);
667                    self.store
668                        .copy_graph(Some(&src_key), Some(&storage_key))
669                        .map_err(|e| Error::Internal(e.to_string()))?;
670                }
671
672                // Bind to graph type if specified.
673                // If the parser produced a '/' in the name it is already a qualified
674                // "schema/type" key; otherwise resolve against the current schema.
675                if let Some(type_name) = typed
676                    && let Err(e) = self.catalog.bind_graph_type(
677                        &storage_key,
678                        if type_name.contains('/') {
679                            type_name.clone()
680                        } else {
681                            self.effective_type_key(&type_name)
682                        },
683                    )
684                {
685                    return Err(Error::Query(QueryError::new(
686                        QueryErrorKind::Semantic,
687                        e.to_string(),
688                    )));
689                }
690
691                // LIKE: copy graph type binding from source
692                if let Some(ref src) = like_graph {
693                    let src_key = self.effective_graph_key(src);
694                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
695                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
696                    }
697                }
698
699                Ok(QueryResult::empty())
700            }
701            SessionCommand::DropGraph { name, if_exists } => {
702                let storage_key = self.effective_graph_key(&name);
703                let dropped = self.store.drop_graph(&storage_key);
704                if !dropped && !if_exists {
705                    return Err(Error::Query(QueryError::new(
706                        QueryErrorKind::Semantic,
707                        format!("Graph '{name}' does not exist"),
708                    )));
709                }
710                if dropped {
711                    #[cfg(feature = "wal")]
712                    self.log_schema_wal(
713                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
714                            name: storage_key.clone(),
715                        },
716                    );
717                    // If this session was using the dropped graph, reset to default
718                    let mut current = self.current_graph.lock();
719                    if current
720                        .as_deref()
721                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
722                    {
723                        *current = None;
724                    }
725                }
726                Ok(QueryResult::empty())
727            }
728            SessionCommand::UseGraph(name) => {
729                // Verify graph exists (resolve within current schema)
730                let effective_key = self.effective_graph_key(&name);
731                if !name.eq_ignore_ascii_case("default")
732                    && self.store.graph(&effective_key).is_none()
733                {
734                    return Err(Error::Query(QueryError::new(
735                        QueryErrorKind::Semantic,
736                        format!("Graph '{name}' does not exist"),
737                    )));
738                }
739                self.use_graph(&name);
740                // Track the new graph if in a transaction
741                self.track_graph_touch();
742                Ok(QueryResult::empty())
743            }
744            SessionCommand::SessionSetGraph(name) => {
745                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
746                let effective_key = self.effective_graph_key(&name);
747                if !name.eq_ignore_ascii_case("default")
748                    && self.store.graph(&effective_key).is_none()
749                {
750                    return Err(Error::Query(QueryError::new(
751                        QueryErrorKind::Semantic,
752                        format!("Graph '{name}' does not exist"),
753                    )));
754                }
755                self.use_graph(&name);
756                // Track the new graph if in a transaction
757                self.track_graph_touch();
758                Ok(QueryResult::empty())
759            }
760            SessionCommand::SessionSetSchema(name) => {
761                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
762                if !self.catalog.schema_exists(&name) {
763                    return Err(Error::Query(QueryError::new(
764                        QueryErrorKind::Semantic,
765                        format!("Schema '{name}' does not exist"),
766                    )));
767                }
768                self.set_schema(&name);
769                Ok(QueryResult::empty())
770            }
771            SessionCommand::SessionSetTimeZone(tz) => {
772                self.set_time_zone(&tz);
773                Ok(QueryResult::empty())
774            }
775            SessionCommand::SessionSetParameter(key, expr) => {
776                if key.eq_ignore_ascii_case("viewing_epoch") {
777                    match Self::eval_integer_literal(&expr) {
778                        Some(n) if n >= 0 => {
779                            self.set_viewing_epoch(EpochId::new(n as u64));
780                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
781                        }
782                        _ => Err(Error::Query(QueryError::new(
783                            QueryErrorKind::Semantic,
784                            "viewing_epoch must be a non-negative integer literal",
785                        ))),
786                    }
787                } else {
788                    // For now, store parameter name with Null value.
789                    // Full expression evaluation would require building and executing a plan.
790                    self.set_parameter(&key, Value::Null);
791                    Ok(QueryResult::empty())
792                }
793            }
794            SessionCommand::SessionReset(target) => {
795                use grafeo_adapters::query::gql::ast::SessionResetTarget;
796                match target {
797                    SessionResetTarget::All => self.reset_session(),
798                    SessionResetTarget::Schema => self.reset_schema(),
799                    SessionResetTarget::Graph => self.reset_graph(),
800                    SessionResetTarget::TimeZone => self.reset_time_zone(),
801                    SessionResetTarget::Parameters => self.reset_parameters(),
802                }
803                Ok(QueryResult::empty())
804            }
805            SessionCommand::SessionClose => {
806                self.reset_session();
807                Ok(QueryResult::empty())
808            }
809            SessionCommand::StartTransaction {
810                read_only,
811                isolation_level,
812            } => {
813                let engine_level = isolation_level.map(|l| match l {
814                    TransactionIsolationLevel::ReadCommitted => {
815                        crate::transaction::IsolationLevel::ReadCommitted
816                    }
817                    TransactionIsolationLevel::SnapshotIsolation => {
818                        crate::transaction::IsolationLevel::SnapshotIsolation
819                    }
820                    TransactionIsolationLevel::Serializable => {
821                        crate::transaction::IsolationLevel::Serializable
822                    }
823                });
824                self.begin_transaction_inner(read_only, engine_level)?;
825                Ok(QueryResult::status("Transaction started"))
826            }
827            SessionCommand::Commit => {
828                self.commit_inner()?;
829                Ok(QueryResult::status("Transaction committed"))
830            }
831            SessionCommand::Rollback => {
832                self.rollback_inner()?;
833                Ok(QueryResult::status("Transaction rolled back"))
834            }
835            SessionCommand::Savepoint(name) => {
836                self.savepoint(&name)?;
837                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
838            }
839            SessionCommand::RollbackToSavepoint(name) => {
840                self.rollback_to_savepoint(&name)?;
841                Ok(QueryResult::status(format!(
842                    "Rolled back to savepoint '{name}'"
843                )))
844            }
845            SessionCommand::ReleaseSavepoint(name) => {
846                self.release_savepoint(&name)?;
847                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
848            }
849        }
850    }
851
852    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
853    #[cfg(feature = "wal")]
854    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
855        if let Some(ref wal) = self.wal
856            && let Err(e) = wal.log(record)
857        {
858            grafeo_warn!("Failed to log schema change to WAL: {}", e);
859        }
860    }
861
862    /// Executes a schema DDL command, returning a status result.
863    #[cfg(feature = "gql")]
864    fn execute_schema_command(
865        &self,
866        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
867    ) -> Result<QueryResult> {
868        use crate::catalog::{
869            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
870        };
871        use grafeo_adapters::query::gql::ast::SchemaStatement;
872        #[cfg(feature = "wal")]
873        use grafeo_adapters::storage::wal::WalRecord;
874        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
875
876        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
877        macro_rules! wal_log {
878            ($self:expr, $record:expr) => {
879                #[cfg(feature = "wal")]
880                $self.log_schema_wal(&$record);
881            };
882        }
883
884        let result = match cmd {
885            SchemaStatement::CreateNodeType(stmt) => {
886                let effective_name = self.effective_type_key(&stmt.name);
887                #[cfg(feature = "wal")]
888                let props_for_wal: Vec<(String, String, bool)> = stmt
889                    .properties
890                    .iter()
891                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
892                    .collect();
893                let def = NodeTypeDefinition {
894                    name: effective_name.clone(),
895                    properties: stmt
896                        .properties
897                        .iter()
898                        .map(|p| TypedProperty {
899                            name: p.name.clone(),
900                            data_type: PropertyDataType::from_type_name(&p.data_type),
901                            nullable: p.nullable,
902                            default_value: p
903                                .default_value
904                                .as_ref()
905                                .map(|s| parse_default_literal(s)),
906                        })
907                        .collect(),
908                    constraints: Vec::new(),
909                    parent_types: stmt.parent_types.clone(),
910                };
911                let result = if stmt.or_replace {
912                    let _ = self.catalog.drop_node_type(&effective_name);
913                    self.catalog.register_node_type(def)
914                } else {
915                    self.catalog.register_node_type(def)
916                };
917                match result {
918                    Ok(()) => {
919                        wal_log!(
920                            self,
921                            WalRecord::CreateNodeType {
922                                name: effective_name.clone(),
923                                properties: props_for_wal,
924                                constraints: Vec::new(),
925                            }
926                        );
927                        Ok(QueryResult::status(format!(
928                            "Created node type '{}'",
929                            stmt.name
930                        )))
931                    }
932                    Err(e) if stmt.if_not_exists => {
933                        let _ = e;
934                        Ok(QueryResult::status("No change"))
935                    }
936                    Err(e) => Err(Error::Query(QueryError::new(
937                        QueryErrorKind::Semantic,
938                        e.to_string(),
939                    ))),
940                }
941            }
942            SchemaStatement::CreateEdgeType(stmt) => {
943                let effective_name = self.effective_type_key(&stmt.name);
944                #[cfg(feature = "wal")]
945                let props_for_wal: Vec<(String, String, bool)> = stmt
946                    .properties
947                    .iter()
948                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
949                    .collect();
950                let def = EdgeTypeDefinition {
951                    name: effective_name.clone(),
952                    properties: stmt
953                        .properties
954                        .iter()
955                        .map(|p| TypedProperty {
956                            name: p.name.clone(),
957                            data_type: PropertyDataType::from_type_name(&p.data_type),
958                            nullable: p.nullable,
959                            default_value: p
960                                .default_value
961                                .as_ref()
962                                .map(|s| parse_default_literal(s)),
963                        })
964                        .collect(),
965                    constraints: Vec::new(),
966                    source_node_types: stmt.source_node_types.clone(),
967                    target_node_types: stmt.target_node_types.clone(),
968                };
969                let result = if stmt.or_replace {
970                    let _ = self.catalog.drop_edge_type_def(&effective_name);
971                    self.catalog.register_edge_type_def(def)
972                } else {
973                    self.catalog.register_edge_type_def(def)
974                };
975                match result {
976                    Ok(()) => {
977                        wal_log!(
978                            self,
979                            WalRecord::CreateEdgeType {
980                                name: effective_name.clone(),
981                                properties: props_for_wal,
982                                constraints: Vec::new(),
983                            }
984                        );
985                        Ok(QueryResult::status(format!(
986                            "Created edge type '{}'",
987                            stmt.name
988                        )))
989                    }
990                    Err(e) if stmt.if_not_exists => {
991                        let _ = e;
992                        Ok(QueryResult::status("No change"))
993                    }
994                    Err(e) => Err(Error::Query(QueryError::new(
995                        QueryErrorKind::Semantic,
996                        e.to_string(),
997                    ))),
998                }
999            }
1000            SchemaStatement::CreateVectorIndex(stmt) => {
1001                Self::create_vector_index_on_store(
1002                    &self.active_lpg_store(),
1003                    &stmt.node_label,
1004                    &stmt.property,
1005                    stmt.dimensions,
1006                    stmt.metric.as_deref(),
1007                )?;
1008                wal_log!(
1009                    self,
1010                    WalRecord::CreateIndex {
1011                        name: stmt.name.clone(),
1012                        label: stmt.node_label.clone(),
1013                        property: stmt.property.clone(),
1014                        index_type: "vector".to_string(),
1015                    }
1016                );
1017                Ok(QueryResult::status(format!(
1018                    "Created vector index '{}'",
1019                    stmt.name
1020                )))
1021            }
1022            SchemaStatement::DropNodeType { name, if_exists } => {
1023                let effective_name = self.effective_type_key(&name);
1024                match self.catalog.drop_node_type(&effective_name) {
1025                    Ok(()) => {
1026                        wal_log!(
1027                            self,
1028                            WalRecord::DropNodeType {
1029                                name: effective_name
1030                            }
1031                        );
1032                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
1033                    }
1034                    Err(e) if if_exists => {
1035                        let _ = e;
1036                        Ok(QueryResult::status("No change"))
1037                    }
1038                    Err(e) => Err(Error::Query(QueryError::new(
1039                        QueryErrorKind::Semantic,
1040                        e.to_string(),
1041                    ))),
1042                }
1043            }
1044            SchemaStatement::DropEdgeType { name, if_exists } => {
1045                let effective_name = self.effective_type_key(&name);
1046                match self.catalog.drop_edge_type_def(&effective_name) {
1047                    Ok(()) => {
1048                        wal_log!(
1049                            self,
1050                            WalRecord::DropEdgeType {
1051                                name: effective_name
1052                            }
1053                        );
1054                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1055                    }
1056                    Err(e) if if_exists => {
1057                        let _ = e;
1058                        Ok(QueryResult::status("No change"))
1059                    }
1060                    Err(e) => Err(Error::Query(QueryError::new(
1061                        QueryErrorKind::Semantic,
1062                        e.to_string(),
1063                    ))),
1064                }
1065            }
1066            SchemaStatement::CreateIndex(stmt) => {
1067                use crate::catalog::IndexType as CatalogIndexType;
1068                use grafeo_adapters::query::gql::ast::IndexKind;
1069                let active = self.active_lpg_store();
1070                let index_type_str = match stmt.index_kind {
1071                    IndexKind::Property => "property",
1072                    IndexKind::BTree => "btree",
1073                    IndexKind::Text => "text",
1074                    IndexKind::Vector => "vector",
1075                };
1076                match stmt.index_kind {
1077                    IndexKind::Property | IndexKind::BTree => {
1078                        for prop in &stmt.properties {
1079                            active.create_property_index(prop);
1080                        }
1081                    }
1082                    IndexKind::Text => {
1083                        for prop in &stmt.properties {
1084                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1085                        }
1086                    }
1087                    IndexKind::Vector => {
1088                        for prop in &stmt.properties {
1089                            Self::create_vector_index_on_store(
1090                                &active,
1091                                &stmt.label,
1092                                prop,
1093                                stmt.options.dimensions,
1094                                stmt.options.metric.as_deref(),
1095                            )?;
1096                        }
1097                    }
1098                }
1099                // Register each property index in the catalog for SHOW INDEXES
1100                // and DROP INDEX name-based lookup.
1101                let catalog_index_type = match stmt.index_kind {
1102                    IndexKind::Property => CatalogIndexType::Hash,
1103                    IndexKind::BTree => CatalogIndexType::BTree,
1104                    IndexKind::Text => CatalogIndexType::FullText,
1105                    IndexKind::Vector => CatalogIndexType::Hash,
1106                };
1107                let label_id = self.catalog.get_or_create_label(&stmt.label);
1108                for prop in &stmt.properties {
1109                    let prop_id = self.catalog.get_or_create_property_key(prop);
1110                    self.catalog
1111                        .create_index(&stmt.name, label_id, prop_id, catalog_index_type);
1112                }
1113                #[cfg(feature = "wal")]
1114                for prop in &stmt.properties {
1115                    wal_log!(
1116                        self,
1117                        WalRecord::CreateIndex {
1118                            name: stmt.name.clone(),
1119                            label: stmt.label.clone(),
1120                            property: prop.clone(),
1121                            index_type: index_type_str.to_string(),
1122                        }
1123                    );
1124                }
1125                Ok(QueryResult::status(format!(
1126                    "Created {} index '{}'",
1127                    index_type_str, stmt.name
1128                )))
1129            }
1130            SchemaStatement::DropIndex { name, if_exists } => {
1131                // Look up the index by name in the catalog to find the
1132                // underlying property, then drop from both catalog and store.
1133                if let Some(index_id) = self.catalog.find_index_by_name(&name) {
1134                    let def = self.catalog.get_index(index_id);
1135                    self.catalog.drop_index(index_id);
1136                    if let Some(def) = def
1137                        && let Some(prop_name) =
1138                            self.catalog.get_property_key_name(def.property_key)
1139                    {
1140                        self.active_lpg_store().drop_property_index(&prop_name);
1141                    }
1142                    wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1143                    Ok(QueryResult::status(format!("Dropped index '{name}'")))
1144                } else if if_exists {
1145                    Ok(QueryResult::status("No change".to_string()))
1146                } else {
1147                    Err(Error::Query(QueryError::new(
1148                        QueryErrorKind::Semantic,
1149                        format!("Index '{name}' does not exist"),
1150                    )))
1151                }
1152            }
1153            SchemaStatement::CreateConstraint(stmt) => {
1154                use crate::catalog::TypeConstraint;
1155                use grafeo_adapters::query::gql::ast::ConstraintKind;
1156                let kind_str = match stmt.constraint_kind {
1157                    ConstraintKind::Unique => "unique",
1158                    ConstraintKind::NodeKey => "node_key",
1159                    ConstraintKind::NotNull => "not_null",
1160                    ConstraintKind::Exists => "exists",
1161                };
1162                let constraint_name = stmt
1163                    .name
1164                    .clone()
1165                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1166
1167                // Register constraint in catalog type definitions
1168                match stmt.constraint_kind {
1169                    ConstraintKind::Unique => {
1170                        for prop in &stmt.properties {
1171                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1172                            let prop_id = self.catalog.get_or_create_property_key(prop);
1173                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1174                        }
1175                        let _ = self.catalog.add_constraint_to_type(
1176                            &stmt.label,
1177                            TypeConstraint::Unique(stmt.properties.clone()),
1178                        );
1179                    }
1180                    ConstraintKind::NodeKey => {
1181                        for prop in &stmt.properties {
1182                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1183                            let prop_id = self.catalog.get_or_create_property_key(prop);
1184                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1185                            let _ = self.catalog.add_required_property(label_id, prop_id);
1186                        }
1187                        let _ = self.catalog.add_constraint_to_type(
1188                            &stmt.label,
1189                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1190                        );
1191                    }
1192                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1193                        for prop in &stmt.properties {
1194                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1195                            let prop_id = self.catalog.get_or_create_property_key(prop);
1196                            let _ = self.catalog.add_required_property(label_id, prop_id);
1197                            let _ = self.catalog.add_constraint_to_type(
1198                                &stmt.label,
1199                                TypeConstraint::NotNull(prop.clone()),
1200                            );
1201                        }
1202                    }
1203                }
1204
1205                wal_log!(
1206                    self,
1207                    WalRecord::CreateConstraint {
1208                        name: constraint_name.clone(),
1209                        label: stmt.label.clone(),
1210                        properties: stmt.properties.clone(),
1211                        kind: kind_str.to_string(),
1212                    }
1213                );
1214                Ok(QueryResult::status(format!(
1215                    "Created {kind_str} constraint '{constraint_name}'"
1216                )))
1217            }
1218            SchemaStatement::DropConstraint { name, if_exists } => {
1219                let _ = if_exists;
1220                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1221                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1222            }
1223            SchemaStatement::CreateGraphType(stmt) => {
1224                use crate::catalog::GraphTypeDefinition;
1225                use grafeo_adapters::query::gql::ast::InlineElementType;
1226
1227                let effective_name = self.effective_type_key(&stmt.name);
1228
1229                // GG04: LIKE clause copies type from existing graph
1230                let (mut node_types, mut edge_types, open) =
1231                    if let Some(ref like_graph) = stmt.like_graph {
1232                        // Infer types from the graph's bound type, or use its existing types
1233                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1234                            if let Some(existing) = self
1235                                .catalog
1236                                .schema()
1237                                .and_then(|s| s.get_graph_type(&type_name))
1238                            {
1239                                (
1240                                    existing.allowed_node_types.clone(),
1241                                    existing.allowed_edge_types.clone(),
1242                                    existing.open,
1243                                )
1244                            } else {
1245                                (Vec::new(), Vec::new(), true)
1246                            }
1247                        } else {
1248                            // GG22: Infer from graph data (labels used in graph)
1249                            let nt = self.catalog.all_node_type_names();
1250                            let et = self.catalog.all_edge_type_names();
1251                            if nt.is_empty() && et.is_empty() {
1252                                (Vec::new(), Vec::new(), true)
1253                            } else {
1254                                (nt, et, false)
1255                            }
1256                        }
1257                    } else {
1258                        // Prefix element type names with schema for consistency
1259                        let nt = stmt
1260                            .node_types
1261                            .iter()
1262                            .map(|n| self.effective_type_key(n))
1263                            .collect();
1264                        let et = stmt
1265                            .edge_types
1266                            .iter()
1267                            .map(|n| self.effective_type_key(n))
1268                            .collect();
1269                        (nt, et, stmt.open)
1270                    };
1271
1272                // GG03: Register inline element types and add their names
1273                for inline in &stmt.inline_types {
1274                    match inline {
1275                        InlineElementType::Node {
1276                            name,
1277                            properties,
1278                            key_labels,
1279                            ..
1280                        } => {
1281                            let inline_effective = self.effective_type_key(name);
1282                            let def = NodeTypeDefinition {
1283                                name: inline_effective.clone(),
1284                                properties: properties
1285                                    .iter()
1286                                    .map(|p| TypedProperty {
1287                                        name: p.name.clone(),
1288                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1289                                        nullable: p.nullable,
1290                                        default_value: None,
1291                                    })
1292                                    .collect(),
1293                                constraints: Vec::new(),
1294                                parent_types: key_labels.clone(),
1295                            };
1296                            // Register or replace so inline defs override existing
1297                            self.catalog.register_or_replace_node_type(def);
1298                            #[cfg(feature = "wal")]
1299                            {
1300                                let props_for_wal: Vec<(String, String, bool)> = properties
1301                                    .iter()
1302                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1303                                    .collect();
1304                                self.log_schema_wal(&WalRecord::CreateNodeType {
1305                                    name: inline_effective.clone(),
1306                                    properties: props_for_wal,
1307                                    constraints: Vec::new(),
1308                                });
1309                            }
1310                            if !node_types.contains(&inline_effective) {
1311                                node_types.push(inline_effective);
1312                            }
1313                        }
1314                        InlineElementType::Edge {
1315                            name,
1316                            properties,
1317                            source_node_types,
1318                            target_node_types,
1319                            ..
1320                        } => {
1321                            let inline_effective = self.effective_type_key(name);
1322                            let def = EdgeTypeDefinition {
1323                                name: inline_effective.clone(),
1324                                properties: properties
1325                                    .iter()
1326                                    .map(|p| TypedProperty {
1327                                        name: p.name.clone(),
1328                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1329                                        nullable: p.nullable,
1330                                        default_value: None,
1331                                    })
1332                                    .collect(),
1333                                constraints: Vec::new(),
1334                                source_node_types: source_node_types.clone(),
1335                                target_node_types: target_node_types.clone(),
1336                            };
1337                            self.catalog.register_or_replace_edge_type_def(def);
1338                            #[cfg(feature = "wal")]
1339                            {
1340                                let props_for_wal: Vec<(String, String, bool)> = properties
1341                                    .iter()
1342                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1343                                    .collect();
1344                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1345                                    name: inline_effective.clone(),
1346                                    properties: props_for_wal,
1347                                    constraints: Vec::new(),
1348                                });
1349                            }
1350                            if !edge_types.contains(&inline_effective) {
1351                                edge_types.push(inline_effective);
1352                            }
1353                        }
1354                    }
1355                }
1356
1357                let def = GraphTypeDefinition {
1358                    name: effective_name.clone(),
1359                    allowed_node_types: node_types.clone(),
1360                    allowed_edge_types: edge_types.clone(),
1361                    open,
1362                };
1363                let result = if stmt.or_replace {
1364                    // Drop existing first, ignore error if not found
1365                    let _ = self.catalog.drop_graph_type(&effective_name);
1366                    self.catalog.register_graph_type(def)
1367                } else {
1368                    self.catalog.register_graph_type(def)
1369                };
1370                match result {
1371                    Ok(()) => {
1372                        wal_log!(
1373                            self,
1374                            WalRecord::CreateGraphType {
1375                                name: effective_name.clone(),
1376                                node_types,
1377                                edge_types,
1378                                open,
1379                            }
1380                        );
1381                        Ok(QueryResult::status(format!(
1382                            "Created graph type '{}'",
1383                            stmt.name
1384                        )))
1385                    }
1386                    Err(e) if stmt.if_not_exists => {
1387                        let _ = e;
1388                        Ok(QueryResult::status("No change"))
1389                    }
1390                    Err(e) => Err(Error::Query(QueryError::new(
1391                        QueryErrorKind::Semantic,
1392                        e.to_string(),
1393                    ))),
1394                }
1395            }
1396            SchemaStatement::DropGraphType { name, if_exists } => {
1397                let effective_name = self.effective_type_key(&name);
1398                match self.catalog.drop_graph_type(&effective_name) {
1399                    Ok(()) => {
1400                        wal_log!(
1401                            self,
1402                            WalRecord::DropGraphType {
1403                                name: effective_name
1404                            }
1405                        );
1406                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1407                    }
1408                    Err(e) if if_exists => {
1409                        let _ = e;
1410                        Ok(QueryResult::status("No change"))
1411                    }
1412                    Err(e) => Err(Error::Query(QueryError::new(
1413                        QueryErrorKind::Semantic,
1414                        e.to_string(),
1415                    ))),
1416                }
1417            }
1418            SchemaStatement::CreateSchema {
1419                name,
1420                if_not_exists,
1421            } => match self.catalog.register_schema_namespace(name.clone()) {
1422                Ok(()) => {
1423                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1424                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1425                }
1426                Err(e) if if_not_exists => {
1427                    let _ = e;
1428                    Ok(QueryResult::status("No change"))
1429                }
1430                Err(e) => Err(Error::Query(QueryError::new(
1431                    QueryErrorKind::Semantic,
1432                    e.to_string(),
1433                ))),
1434            },
1435            SchemaStatement::DropSchema { name, if_exists } => {
1436                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1437                let prefix = format!("{name}/");
1438                let has_graphs = self
1439                    .store
1440                    .graph_names()
1441                    .iter()
1442                    .any(|g| g.starts_with(&prefix));
1443                let has_types = self
1444                    .catalog
1445                    .all_node_type_names()
1446                    .iter()
1447                    .any(|n| n.starts_with(&prefix))
1448                    || self
1449                        .catalog
1450                        .all_edge_type_names()
1451                        .iter()
1452                        .any(|n| n.starts_with(&prefix))
1453                    || self
1454                        .catalog
1455                        .all_graph_type_names()
1456                        .iter()
1457                        .any(|n| n.starts_with(&prefix));
1458                if has_graphs || has_types {
1459                    return Err(Error::Query(QueryError::new(
1460                        QueryErrorKind::Semantic,
1461                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1462                    )));
1463                }
1464                match self.catalog.drop_schema_namespace(&name) {
1465                    Ok(()) => {
1466                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1467                        // If this session was using the dropped schema, reset it
1468                        let mut current = self.current_schema.lock();
1469                        if current
1470                            .as_deref()
1471                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1472                        {
1473                            *current = None;
1474                        }
1475                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1476                    }
1477                    Err(e) if if_exists => {
1478                        let _ = e;
1479                        Ok(QueryResult::status("No change"))
1480                    }
1481                    Err(e) => Err(Error::Query(QueryError::new(
1482                        QueryErrorKind::Semantic,
1483                        e.to_string(),
1484                    ))),
1485                }
1486            }
1487            SchemaStatement::AlterNodeType(stmt) => {
1488                use grafeo_adapters::query::gql::ast::TypeAlteration;
1489                let effective_name = self.effective_type_key(&stmt.name);
1490                let mut wal_alts = Vec::new();
1491                for alt in &stmt.alterations {
1492                    match alt {
1493                        TypeAlteration::AddProperty(prop) => {
1494                            let typed = TypedProperty {
1495                                name: prop.name.clone(),
1496                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1497                                nullable: prop.nullable,
1498                                default_value: prop
1499                                    .default_value
1500                                    .as_ref()
1501                                    .map(|s| parse_default_literal(s)),
1502                            };
1503                            self.catalog
1504                                .alter_node_type_add_property(&effective_name, typed)
1505                                .map_err(|e| {
1506                                    Error::Query(QueryError::new(
1507                                        QueryErrorKind::Semantic,
1508                                        e.to_string(),
1509                                    ))
1510                                })?;
1511                            wal_alts.push((
1512                                "add".to_string(),
1513                                prop.name.clone(),
1514                                prop.data_type.clone(),
1515                                prop.nullable,
1516                            ));
1517                        }
1518                        TypeAlteration::DropProperty(name) => {
1519                            self.catalog
1520                                .alter_node_type_drop_property(&effective_name, name)
1521                                .map_err(|e| {
1522                                    Error::Query(QueryError::new(
1523                                        QueryErrorKind::Semantic,
1524                                        e.to_string(),
1525                                    ))
1526                                })?;
1527                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1528                        }
1529                    }
1530                }
1531                wal_log!(
1532                    self,
1533                    WalRecord::AlterNodeType {
1534                        name: effective_name,
1535                        alterations: wal_alts,
1536                    }
1537                );
1538                Ok(QueryResult::status(format!(
1539                    "Altered node type '{}'",
1540                    stmt.name
1541                )))
1542            }
1543            SchemaStatement::AlterEdgeType(stmt) => {
1544                use grafeo_adapters::query::gql::ast::TypeAlteration;
1545                let effective_name = self.effective_type_key(&stmt.name);
1546                let mut wal_alts = Vec::new();
1547                for alt in &stmt.alterations {
1548                    match alt {
1549                        TypeAlteration::AddProperty(prop) => {
1550                            let typed = TypedProperty {
1551                                name: prop.name.clone(),
1552                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1553                                nullable: prop.nullable,
1554                                default_value: prop
1555                                    .default_value
1556                                    .as_ref()
1557                                    .map(|s| parse_default_literal(s)),
1558                            };
1559                            self.catalog
1560                                .alter_edge_type_add_property(&effective_name, typed)
1561                                .map_err(|e| {
1562                                    Error::Query(QueryError::new(
1563                                        QueryErrorKind::Semantic,
1564                                        e.to_string(),
1565                                    ))
1566                                })?;
1567                            wal_alts.push((
1568                                "add".to_string(),
1569                                prop.name.clone(),
1570                                prop.data_type.clone(),
1571                                prop.nullable,
1572                            ));
1573                        }
1574                        TypeAlteration::DropProperty(name) => {
1575                            self.catalog
1576                                .alter_edge_type_drop_property(&effective_name, name)
1577                                .map_err(|e| {
1578                                    Error::Query(QueryError::new(
1579                                        QueryErrorKind::Semantic,
1580                                        e.to_string(),
1581                                    ))
1582                                })?;
1583                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1584                        }
1585                    }
1586                }
1587                wal_log!(
1588                    self,
1589                    WalRecord::AlterEdgeType {
1590                        name: effective_name,
1591                        alterations: wal_alts,
1592                    }
1593                );
1594                Ok(QueryResult::status(format!(
1595                    "Altered edge type '{}'",
1596                    stmt.name
1597                )))
1598            }
1599            SchemaStatement::AlterGraphType(stmt) => {
1600                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1601                let effective_name = self.effective_type_key(&stmt.name);
1602                let mut wal_alts = Vec::new();
1603                for alt in &stmt.alterations {
1604                    match alt {
1605                        GraphTypeAlteration::AddNodeType(name) => {
1606                            self.catalog
1607                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1608                                .map_err(|e| {
1609                                    Error::Query(QueryError::new(
1610                                        QueryErrorKind::Semantic,
1611                                        e.to_string(),
1612                                    ))
1613                                })?;
1614                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1615                        }
1616                        GraphTypeAlteration::DropNodeType(name) => {
1617                            self.catalog
1618                                .alter_graph_type_drop_node_type(&effective_name, name)
1619                                .map_err(|e| {
1620                                    Error::Query(QueryError::new(
1621                                        QueryErrorKind::Semantic,
1622                                        e.to_string(),
1623                                    ))
1624                                })?;
1625                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1626                        }
1627                        GraphTypeAlteration::AddEdgeType(name) => {
1628                            self.catalog
1629                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1630                                .map_err(|e| {
1631                                    Error::Query(QueryError::new(
1632                                        QueryErrorKind::Semantic,
1633                                        e.to_string(),
1634                                    ))
1635                                })?;
1636                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1637                        }
1638                        GraphTypeAlteration::DropEdgeType(name) => {
1639                            self.catalog
1640                                .alter_graph_type_drop_edge_type(&effective_name, name)
1641                                .map_err(|e| {
1642                                    Error::Query(QueryError::new(
1643                                        QueryErrorKind::Semantic,
1644                                        e.to_string(),
1645                                    ))
1646                                })?;
1647                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1648                        }
1649                    }
1650                }
1651                wal_log!(
1652                    self,
1653                    WalRecord::AlterGraphType {
1654                        name: effective_name,
1655                        alterations: wal_alts,
1656                    }
1657                );
1658                Ok(QueryResult::status(format!(
1659                    "Altered graph type '{}'",
1660                    stmt.name
1661                )))
1662            }
1663            SchemaStatement::CreateProcedure(stmt) => {
1664                use crate::catalog::ProcedureDefinition;
1665
1666                let def = ProcedureDefinition {
1667                    name: stmt.name.clone(),
1668                    params: stmt
1669                        .params
1670                        .iter()
1671                        .map(|p| (p.name.clone(), p.param_type.clone()))
1672                        .collect(),
1673                    returns: stmt
1674                        .returns
1675                        .iter()
1676                        .map(|r| (r.name.clone(), r.return_type.clone()))
1677                        .collect(),
1678                    body: stmt.body.clone(),
1679                };
1680
1681                if stmt.or_replace {
1682                    self.catalog.replace_procedure(def).map_err(|e| {
1683                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1684                    })?;
1685                } else {
1686                    match self.catalog.register_procedure(def) {
1687                        Ok(()) => {}
1688                        Err(_) if stmt.if_not_exists => {
1689                            return Ok(QueryResult::empty());
1690                        }
1691                        Err(e) => {
1692                            return Err(Error::Query(QueryError::new(
1693                                QueryErrorKind::Semantic,
1694                                e.to_string(),
1695                            )));
1696                        }
1697                    }
1698                }
1699
1700                wal_log!(
1701                    self,
1702                    WalRecord::CreateProcedure {
1703                        name: stmt.name.clone(),
1704                        params: stmt
1705                            .params
1706                            .iter()
1707                            .map(|p| (p.name.clone(), p.param_type.clone()))
1708                            .collect(),
1709                        returns: stmt
1710                            .returns
1711                            .iter()
1712                            .map(|r| (r.name.clone(), r.return_type.clone()))
1713                            .collect(),
1714                        body: stmt.body,
1715                    }
1716                );
1717                Ok(QueryResult::status(format!(
1718                    "Created procedure '{}'",
1719                    stmt.name
1720                )))
1721            }
1722            SchemaStatement::DropProcedure { name, if_exists } => {
1723                match self.catalog.drop_procedure(&name) {
1724                    Ok(()) => {}
1725                    Err(_) if if_exists => {
1726                        return Ok(QueryResult::empty());
1727                    }
1728                    Err(e) => {
1729                        return Err(Error::Query(QueryError::new(
1730                            QueryErrorKind::Semantic,
1731                            e.to_string(),
1732                        )));
1733                    }
1734                }
1735                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1736                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1737            }
1738            SchemaStatement::ShowIndexes => {
1739                return self.execute_show_indexes();
1740            }
1741            SchemaStatement::ShowConstraints => {
1742                return self.execute_show_constraints();
1743            }
1744            SchemaStatement::ShowNodeTypes => {
1745                return self.execute_show_node_types();
1746            }
1747            SchemaStatement::ShowEdgeTypes => {
1748                return self.execute_show_edge_types();
1749            }
1750            SchemaStatement::ShowGraphTypes => {
1751                return self.execute_show_graph_types();
1752            }
1753            SchemaStatement::ShowGraphType(name) => {
1754                return self.execute_show_graph_type(&name);
1755            }
1756            SchemaStatement::ShowCurrentGraphType => {
1757                return self.execute_show_current_graph_type();
1758            }
1759            SchemaStatement::ShowGraphs => {
1760                return self.execute_show_graphs();
1761            }
1762            SchemaStatement::ShowSchemas => {
1763                return self.execute_show_schemas();
1764            }
1765        };
1766
1767        // Invalidate all cached query plans after any successful DDL change.
1768        // DDL is rare, so clearing the entire cache is cheap and correct.
1769        if result.is_ok() {
1770            self.query_cache.clear();
1771        }
1772
1773        result
1774    }
1775
1776    /// Creates a vector index on the store by scanning existing nodes.
1777    #[cfg(all(feature = "gql", feature = "vector-index"))]
1778    fn create_vector_index_on_store(
1779        store: &LpgStore,
1780        label: &str,
1781        property: &str,
1782        dimensions: Option<usize>,
1783        metric: Option<&str>,
1784    ) -> Result<()> {
1785        use grafeo_common::types::{PropertyKey, Value};
1786        use grafeo_common::utils::error::Error;
1787        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1788
1789        let metric = match metric {
1790            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1791                Error::Internal(format!(
1792                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1793                ))
1794            })?,
1795            None => DistanceMetric::Cosine,
1796        };
1797
1798        let prop_key = PropertyKey::new(property);
1799        let mut found_dims: Option<usize> = dimensions;
1800        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1801
1802        for node in store.nodes_with_label(label) {
1803            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1804                if let Some(expected) = found_dims {
1805                    if v.len() != expected {
1806                        return Err(Error::Internal(format!(
1807                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1808                            v.len(),
1809                            node.id.0
1810                        )));
1811                    }
1812                } else {
1813                    found_dims = Some(v.len());
1814                }
1815                vectors.push((node.id, v.to_vec()));
1816            }
1817        }
1818
1819        let Some(dims) = found_dims else {
1820            return Err(Error::Internal(format!(
1821                "No vector properties found on :{label}({property}) and no dimensions specified"
1822            )));
1823        };
1824
1825        let config = HnswConfig::new(dims, metric);
1826        let index = HnswIndex::with_capacity(config, vectors.len());
1827        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1828        for (node_id, vec) in &vectors {
1829            index.insert(*node_id, vec, &accessor);
1830        }
1831
1832        store.add_vector_index(label, property, Arc::new(index));
1833        Ok(())
1834    }
1835
1836    /// Stub for when vector-index feature is not enabled.
1837    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1838    fn create_vector_index_on_store(
1839        _store: &LpgStore,
1840        _label: &str,
1841        _property: &str,
1842        _dimensions: Option<usize>,
1843        _metric: Option<&str>,
1844    ) -> Result<()> {
1845        Err(grafeo_common::utils::error::Error::Internal(
1846            "Vector index support requires the 'vector-index' feature".to_string(),
1847        ))
1848    }
1849
1850    /// Creates a text index on the store by scanning existing nodes.
1851    #[cfg(all(feature = "gql", feature = "text-index"))]
1852    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1853        use grafeo_common::types::{PropertyKey, Value};
1854        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1855
1856        let mut index = InvertedIndex::new(BM25Config::default());
1857        let prop_key = PropertyKey::new(property);
1858
1859        let nodes = store.nodes_by_label(label);
1860        for node_id in nodes {
1861            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1862                index.insert(node_id, text.as_str());
1863            }
1864        }
1865
1866        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1867        Ok(())
1868    }
1869
1870    /// Stub for when text-index feature is not enabled.
1871    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1872    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1873        Err(grafeo_common::utils::error::Error::Internal(
1874            "Text index support requires the 'text-index' feature".to_string(),
1875        ))
1876    }
1877
1878    /// Returns a table of all indexes from the catalog.
1879    fn execute_show_indexes(&self) -> Result<QueryResult> {
1880        let indexes = self.catalog.all_indexes();
1881        let columns = vec![
1882            "name".to_string(),
1883            "type".to_string(),
1884            "label".to_string(),
1885            "property".to_string(),
1886        ];
1887        let rows: Vec<Vec<Value>> = indexes
1888            .into_iter()
1889            .map(|def| {
1890                let label_name = self
1891                    .catalog
1892                    .get_label_name(def.label)
1893                    .unwrap_or_else(|| "?".into());
1894                let prop_name = self
1895                    .catalog
1896                    .get_property_key_name(def.property_key)
1897                    .unwrap_or_else(|| "?".into());
1898                vec![
1899                    Value::from(def.name),
1900                    Value::from(format!("{:?}", def.index_type)),
1901                    Value::from(&*label_name),
1902                    Value::from(&*prop_name),
1903                ]
1904            })
1905            .collect();
1906        Ok(QueryResult {
1907            columns,
1908            column_types: Vec::new(),
1909            rows,
1910            ..QueryResult::empty()
1911        })
1912    }
1913
1914    /// Returns a table of all constraints (currently metadata-only).
1915    fn execute_show_constraints(&self) -> Result<QueryResult> {
1916        // Constraints are tracked in WAL but not yet in a queryable catalog.
1917        // Return an empty table with the expected schema.
1918        Ok(QueryResult {
1919            columns: vec![
1920                "name".to_string(),
1921                "type".to_string(),
1922                "label".to_string(),
1923                "properties".to_string(),
1924            ],
1925            column_types: Vec::new(),
1926            rows: Vec::new(),
1927            ..QueryResult::empty()
1928        })
1929    }
1930
1931    /// Returns a table of all registered node types in the current schema.
1932    fn execute_show_node_types(&self) -> Result<QueryResult> {
1933        let columns = vec![
1934            "name".to_string(),
1935            "properties".to_string(),
1936            "constraints".to_string(),
1937            "parents".to_string(),
1938        ];
1939        let schema = self.current_schema.lock().clone();
1940        let all_names = self.catalog.all_node_type_names();
1941        let type_names: Vec<String> = match &schema {
1942            Some(s) => {
1943                let prefix = format!("{s}/");
1944                all_names
1945                    .into_iter()
1946                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1947                    .collect()
1948            }
1949            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1950        };
1951        let rows: Vec<Vec<Value>> = type_names
1952            .into_iter()
1953            .filter_map(|name| {
1954                let lookup = match &schema {
1955                    Some(s) => format!("{s}/{name}"),
1956                    None => name.clone(),
1957                };
1958                let def = self.catalog.get_node_type(&lookup)?;
1959                let props: Vec<String> = def
1960                    .properties
1961                    .iter()
1962                    .map(|p| {
1963                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1964                        format!("{} {}{}", p.name, p.data_type, nullable)
1965                    })
1966                    .collect();
1967                let constraints: Vec<String> =
1968                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1969                let parents = def.parent_types.join(", ");
1970                Some(vec![
1971                    Value::from(name),
1972                    Value::from(props.join(", ")),
1973                    Value::from(constraints.join(", ")),
1974                    Value::from(parents),
1975                ])
1976            })
1977            .collect();
1978        Ok(QueryResult {
1979            columns,
1980            column_types: Vec::new(),
1981            rows,
1982            ..QueryResult::empty()
1983        })
1984    }
1985
1986    /// Returns a table of all registered edge types in the current schema.
1987    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1988        let columns = vec![
1989            "name".to_string(),
1990            "properties".to_string(),
1991            "source_types".to_string(),
1992            "target_types".to_string(),
1993        ];
1994        let schema = self.current_schema.lock().clone();
1995        let all_names = self.catalog.all_edge_type_names();
1996        let type_names: Vec<String> = match &schema {
1997            Some(s) => {
1998                let prefix = format!("{s}/");
1999                all_names
2000                    .into_iter()
2001                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2002                    .collect()
2003            }
2004            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2005        };
2006        let rows: Vec<Vec<Value>> = type_names
2007            .into_iter()
2008            .filter_map(|name| {
2009                let lookup = match &schema {
2010                    Some(s) => format!("{s}/{name}"),
2011                    None => name.clone(),
2012                };
2013                let def = self.catalog.get_edge_type_def(&lookup)?;
2014                let props: Vec<String> = def
2015                    .properties
2016                    .iter()
2017                    .map(|p| {
2018                        let nullable = if p.nullable { "" } else { " NOT NULL" };
2019                        format!("{} {}{}", p.name, p.data_type, nullable)
2020                    })
2021                    .collect();
2022                let src = def.source_node_types.join(", ");
2023                let tgt = def.target_node_types.join(", ");
2024                Some(vec![
2025                    Value::from(name),
2026                    Value::from(props.join(", ")),
2027                    Value::from(src),
2028                    Value::from(tgt),
2029                ])
2030            })
2031            .collect();
2032        Ok(QueryResult {
2033            columns,
2034            column_types: Vec::new(),
2035            rows,
2036            ..QueryResult::empty()
2037        })
2038    }
2039
2040    /// Returns a table of all registered graph types in the current schema.
2041    fn execute_show_graph_types(&self) -> Result<QueryResult> {
2042        let columns = vec![
2043            "name".to_string(),
2044            "open".to_string(),
2045            "node_types".to_string(),
2046            "edge_types".to_string(),
2047        ];
2048        let schema = self.current_schema.lock().clone();
2049        let all_names = self.catalog.all_graph_type_names();
2050        let type_names: Vec<String> = match &schema {
2051            Some(s) => {
2052                let prefix = format!("{s}/");
2053                all_names
2054                    .into_iter()
2055                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2056                    .collect()
2057            }
2058            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2059        };
2060        let rows: Vec<Vec<Value>> = type_names
2061            .into_iter()
2062            .filter_map(|name| {
2063                let lookup = match &schema {
2064                    Some(s) => format!("{s}/{name}"),
2065                    None => name.clone(),
2066                };
2067                let def = self.catalog.get_graph_type_def(&lookup)?;
2068                // Strip schema prefix from allowed type names for display
2069                let strip = |n: &String| -> String {
2070                    match &schema {
2071                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2072                        None => n.clone(),
2073                    }
2074                };
2075                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2076                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2077                Some(vec![
2078                    Value::from(name),
2079                    Value::from(def.open),
2080                    Value::from(node_types.join(", ")),
2081                    Value::from(edge_types.join(", ")),
2082                ])
2083            })
2084            .collect();
2085        Ok(QueryResult {
2086            columns,
2087            column_types: Vec::new(),
2088            rows,
2089            ..QueryResult::empty()
2090        })
2091    }
2092
2093    /// Returns the list of named graphs visible in the current schema context.
2094    ///
2095    /// When a session schema is set, only graphs belonging to that schema are
2096    /// shown (their compound prefix is stripped). When no schema is set, graphs
2097    /// without a schema prefix are shown (the default schema).
2098    fn execute_show_graphs(&self) -> Result<QueryResult> {
2099        let schema = self.current_schema.lock().clone();
2100        let all_names = self.store.graph_names();
2101
2102        let mut names: Vec<String> = match &schema {
2103            Some(s) => {
2104                let prefix = format!("{s}/");
2105                all_names
2106                    .into_iter()
2107                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2108                    .collect()
2109            }
2110            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2111        };
2112        names.sort();
2113
2114        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2115        Ok(QueryResult {
2116            columns: vec!["name".to_string()],
2117            column_types: Vec::new(),
2118            rows,
2119            ..QueryResult::empty()
2120        })
2121    }
2122
2123    /// Returns the list of all schema namespaces.
2124    fn execute_show_schemas(&self) -> Result<QueryResult> {
2125        let mut names = self.catalog.schema_names();
2126        names.sort();
2127        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2128        Ok(QueryResult {
2129            columns: vec!["name".to_string()],
2130            column_types: Vec::new(),
2131            rows,
2132            ..QueryResult::empty()
2133        })
2134    }
2135
2136    /// Returns detailed info for a specific graph type.
2137    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2138        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2139
2140        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2141            Error::Query(QueryError::new(
2142                QueryErrorKind::Semantic,
2143                format!("Graph type '{name}' not found"),
2144            ))
2145        })?;
2146
2147        let columns = vec![
2148            "name".to_string(),
2149            "open".to_string(),
2150            "node_types".to_string(),
2151            "edge_types".to_string(),
2152        ];
2153        let rows = vec![vec![
2154            Value::from(def.name),
2155            Value::from(def.open),
2156            Value::from(def.allowed_node_types.join(", ")),
2157            Value::from(def.allowed_edge_types.join(", ")),
2158        ]];
2159        Ok(QueryResult {
2160            columns,
2161            column_types: Vec::new(),
2162            rows,
2163            ..QueryResult::empty()
2164        })
2165    }
2166
2167    /// Returns the graph type bound to the current graph.
2168    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2169        let graph_name = self
2170            .current_graph()
2171            .unwrap_or_else(|| "default".to_string());
2172        let columns = vec![
2173            "graph".to_string(),
2174            "graph_type".to_string(),
2175            "open".to_string(),
2176            "node_types".to_string(),
2177            "edge_types".to_string(),
2178        ];
2179
2180        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2181            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2182        {
2183            let rows = vec![vec![
2184                Value::from(graph_name),
2185                Value::from(type_name),
2186                Value::from(def.open),
2187                Value::from(def.allowed_node_types.join(", ")),
2188                Value::from(def.allowed_edge_types.join(", ")),
2189            ]];
2190            return Ok(QueryResult {
2191                columns,
2192                column_types: Vec::new(),
2193                rows,
2194                ..QueryResult::empty()
2195            });
2196        }
2197
2198        // No graph type binding found
2199        Ok(QueryResult {
2200            columns,
2201            column_types: Vec::new(),
2202            rows: vec![vec![
2203                Value::from(graph_name),
2204                Value::Null,
2205                Value::Null,
2206                Value::Null,
2207                Value::Null,
2208            ]],
2209            ..QueryResult::empty()
2210        })
2211    }
2212
2213    /// Executes a GQL query.
2214    ///
2215    /// # Errors
2216    ///
2217    /// Returns an error if the query fails to parse or execute.
2218    ///
2219    /// # Examples
2220    ///
2221    /// ```no_run
2222    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2223    /// use grafeo_engine::GrafeoDB;
2224    ///
2225    /// let db = GrafeoDB::new_in_memory();
2226    /// let session = db.session();
2227    ///
2228    /// // Create a node
2229    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2230    ///
2231    /// // Query nodes
2232    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2233    /// for row in &result.rows {
2234    ///     println!("{:?}", row);
2235    /// }
2236    /// # Ok(())
2237    /// # }
2238    /// ```
2239    #[cfg(feature = "gql")]
2240    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2241        self.require_lpg("GQL")?;
2242
2243        use crate::query::{
2244            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2245            processor::QueryLanguage, translators::gql,
2246        };
2247
2248        let _span = grafeo_info_span!(
2249            "grafeo::session::execute",
2250            language = "gql",
2251            query_len = query.len(),
2252        );
2253
2254        #[cfg(not(target_arch = "wasm32"))]
2255        let start_time = std::time::Instant::now();
2256
2257        // Parse and translate, checking for session/schema commands first
2258        let translation = gql::translate_full(query)?;
2259        let logical_plan = match translation {
2260            gql::GqlTranslationResult::SessionCommand(cmd) => {
2261                return self.execute_session_command(cmd);
2262            }
2263            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2264                // All DDL is a write operation
2265                if *self.read_only_tx.lock() {
2266                    return Err(grafeo_common::utils::error::Error::Transaction(
2267                        grafeo_common::utils::error::TransactionError::ReadOnly,
2268                    ));
2269                }
2270                return self.execute_schema_command(cmd);
2271            }
2272            gql::GqlTranslationResult::Plan(plan) => {
2273                // Block mutations in read-only transactions
2274                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2275                    return Err(grafeo_common::utils::error::Error::Transaction(
2276                        grafeo_common::utils::error::TransactionError::ReadOnly,
2277                    ));
2278                }
2279                plan
2280            }
2281        };
2282
2283        // Create cache key for this query
2284        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2285
2286        // Try to get cached optimized plan, or use the plan we just translated
2287        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2288            cached_plan
2289        } else {
2290            // Semantic validation
2291            let mut binder = Binder::new();
2292            let _binding_context = binder.bind(&logical_plan)?;
2293
2294            // Optimize the plan
2295            let active = self.active_store();
2296            let optimizer = Optimizer::from_graph_store(&*active);
2297            let plan = optimizer.optimize(logical_plan)?;
2298
2299            // Cache the optimized plan for future use
2300            self.query_cache.put_optimized(cache_key, plan.clone());
2301
2302            plan
2303        };
2304
2305        // Resolve the active store for query execution
2306        let active = self.active_store();
2307
2308        // EXPLAIN: annotate pushdown hints and return the plan tree
2309        if optimized_plan.explain {
2310            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2311            let mut plan = optimized_plan;
2312            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2313            return Ok(explain_result(&plan));
2314        }
2315
2316        // PROFILE: execute with per-operator instrumentation
2317        if optimized_plan.profile {
2318            let has_mutations = optimized_plan.root.has_mutations();
2319            return self.with_auto_commit(has_mutations, || {
2320                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2321                let planner = self.create_planner_for_store(
2322                    Arc::clone(&active),
2323                    viewing_epoch,
2324                    transaction_id,
2325                );
2326                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2327
2328                let executor = Executor::with_columns(physical_plan.columns.clone())
2329                    .with_deadline(self.query_deadline());
2330                let _result = executor.execute(physical_plan.operator.as_mut())?;
2331
2332                let total_time_ms;
2333                #[cfg(not(target_arch = "wasm32"))]
2334                {
2335                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2336                }
2337                #[cfg(target_arch = "wasm32")]
2338                {
2339                    total_time_ms = 0.0;
2340                }
2341
2342                let profile_tree = crate::query::profile::build_profile_tree(
2343                    &optimized_plan.root,
2344                    &mut entries.into_iter(),
2345                );
2346                Ok(crate::query::profile::profile_result(
2347                    &profile_tree,
2348                    total_time_ms,
2349                ))
2350            });
2351        }
2352
2353        let has_mutations = optimized_plan.root.has_mutations();
2354
2355        let result = self.with_auto_commit(has_mutations, || {
2356            // Get transaction context for MVCC visibility
2357            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2358
2359            // Convert to physical plan with transaction context
2360            // (Physical planning cannot be cached as it depends on transaction state)
2361            // Safe to use read-only fast path when: this query has no mutations AND
2362            // there is no active transaction that may have prior uncommitted writes.
2363            let has_active_tx = self.current_transaction.lock().is_some();
2364            let read_only = !has_mutations && !has_active_tx;
2365            let planner = self.create_planner_for_store_with_read_only(
2366                Arc::clone(&active),
2367                viewing_epoch,
2368                transaction_id,
2369                read_only,
2370            );
2371            let mut physical_plan = planner.plan(&optimized_plan)?;
2372
2373            // Execute the plan
2374            let executor = Executor::with_columns(physical_plan.columns.clone())
2375                .with_deadline(self.query_deadline());
2376            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2377
2378            // Add execution metrics
2379            let rows_scanned = result.rows.len() as u64;
2380            #[cfg(not(target_arch = "wasm32"))]
2381            {
2382                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2383                result.execution_time_ms = Some(elapsed_ms);
2384            }
2385            result.rows_scanned = Some(rows_scanned);
2386
2387            Ok(result)
2388        });
2389
2390        // Record metrics for this query execution.
2391        #[cfg(feature = "metrics")]
2392        {
2393            #[cfg(not(target_arch = "wasm32"))]
2394            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2395            #[cfg(target_arch = "wasm32")]
2396            let elapsed_ms = None;
2397            self.record_query_metrics("gql", elapsed_ms, &result);
2398        }
2399
2400        result
2401    }
2402
2403    /// Executes a GQL query with visibility at the specified epoch.
2404    ///
2405    /// This enables time-travel queries: the query sees the database
2406    /// as it existed at the given epoch.
2407    ///
2408    /// # Errors
2409    ///
2410    /// Returns an error if parsing or execution fails.
2411    #[cfg(feature = "gql")]
2412    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2413        let previous = self.viewing_epoch_override.lock().replace(epoch);
2414        let result = self.execute(query);
2415        *self.viewing_epoch_override.lock() = previous;
2416        result
2417    }
2418
2419    /// Executes a GQL query at a specific epoch with optional parameters.
2420    ///
2421    /// Combines epoch-based time travel with parameterized queries.
2422    ///
2423    /// # Errors
2424    ///
2425    /// Returns an error if parsing or execution fails.
2426    #[cfg(feature = "gql")]
2427    pub fn execute_at_epoch_with_params(
2428        &self,
2429        query: &str,
2430        epoch: EpochId,
2431        params: Option<std::collections::HashMap<String, Value>>,
2432    ) -> Result<QueryResult> {
2433        let previous = self.viewing_epoch_override.lock().replace(epoch);
2434        let result = if let Some(p) = params {
2435            self.execute_with_params(query, p)
2436        } else {
2437            self.execute(query)
2438        };
2439        *self.viewing_epoch_override.lock() = previous;
2440        result
2441    }
2442
2443    /// Executes a GQL query with parameters.
2444    ///
2445    /// # Errors
2446    ///
2447    /// Returns an error if the query fails to parse or execute.
2448    #[cfg(feature = "gql")]
2449    pub fn execute_with_params(
2450        &self,
2451        query: &str,
2452        params: std::collections::HashMap<String, Value>,
2453    ) -> Result<QueryResult> {
2454        self.require_lpg("GQL")?;
2455
2456        use crate::query::processor::{QueryLanguage, QueryProcessor};
2457
2458        let has_mutations = Self::query_looks_like_mutation(query);
2459        let active = self.active_store();
2460
2461        self.with_auto_commit(has_mutations, || {
2462            // Get transaction context for MVCC visibility
2463            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2464
2465            // Create processor with transaction context
2466            let processor = QueryProcessor::for_stores_with_transaction(
2467                Arc::clone(&active),
2468                self.active_write_store(),
2469                Arc::clone(&self.transaction_manager),
2470            )?;
2471
2472            // Apply transaction context if in a transaction
2473            let processor = if let Some(transaction_id) = transaction_id {
2474                processor.with_transaction_context(viewing_epoch, transaction_id)
2475            } else {
2476                processor
2477            };
2478
2479            processor.process(query, QueryLanguage::Gql, Some(&params))
2480        })
2481    }
2482
2483    /// Executes a GQL query with parameters.
2484    ///
2485    /// # Errors
2486    ///
2487    /// Returns an error if no query language is enabled.
2488    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2489    pub fn execute_with_params(
2490        &self,
2491        _query: &str,
2492        _params: std::collections::HashMap<String, Value>,
2493    ) -> Result<QueryResult> {
2494        Err(grafeo_common::utils::error::Error::Internal(
2495            "No query language enabled".to_string(),
2496        ))
2497    }
2498
2499    /// Executes a GQL query.
2500    ///
2501    /// # Errors
2502    ///
2503    /// Returns an error if no query language is enabled.
2504    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2505    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2506        Err(grafeo_common::utils::error::Error::Internal(
2507            "No query language enabled".to_string(),
2508        ))
2509    }
2510
2511    /// Executes a Cypher query.
2512    ///
2513    /// # Errors
2514    ///
2515    /// Returns an error if the query fails to parse or execute.
2516    #[cfg(feature = "cypher")]
2517    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2518        use crate::query::{
2519            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2520            processor::QueryLanguage, translators::cypher,
2521        };
2522        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2523
2524        // Handle schema DDL and SHOW commands before the normal query path
2525        let translation = cypher::translate_full(query)?;
2526        match translation {
2527            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2528                if *self.read_only_tx.lock() {
2529                    return Err(GrafeoError::Query(QueryError::new(
2530                        QueryErrorKind::Semantic,
2531                        "Cannot execute schema DDL in a read-only transaction",
2532                    )));
2533                }
2534                return self.execute_schema_command(cmd);
2535            }
2536            cypher::CypherTranslationResult::ShowIndexes => {
2537                return self.execute_show_indexes();
2538            }
2539            cypher::CypherTranslationResult::ShowConstraints => {
2540                return self.execute_show_constraints();
2541            }
2542            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2543                return self.execute_show_current_graph_type();
2544            }
2545            cypher::CypherTranslationResult::Plan(_) => {
2546                // Fall through to normal execution below
2547            }
2548        }
2549
2550        #[cfg(not(target_arch = "wasm32"))]
2551        let start_time = std::time::Instant::now();
2552
2553        // Create cache key for this query
2554        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2555
2556        // Try to get cached optimized plan
2557        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2558            cached_plan
2559        } else {
2560            // Parse and translate the query to a logical plan
2561            let logical_plan = cypher::translate(query)?;
2562
2563            // Semantic validation
2564            let mut binder = Binder::new();
2565            let _binding_context = binder.bind(&logical_plan)?;
2566
2567            // Optimize the plan
2568            let active = self.active_store();
2569            let optimizer = Optimizer::from_graph_store(&*active);
2570            let plan = optimizer.optimize(logical_plan)?;
2571
2572            // Cache the optimized plan
2573            self.query_cache.put_optimized(cache_key, plan.clone());
2574
2575            plan
2576        };
2577
2578        // Resolve the active store for query execution
2579        let active = self.active_store();
2580
2581        // EXPLAIN
2582        if optimized_plan.explain {
2583            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2584            let mut plan = optimized_plan;
2585            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2586            return Ok(explain_result(&plan));
2587        }
2588
2589        // PROFILE
2590        if optimized_plan.profile {
2591            let has_mutations = optimized_plan.root.has_mutations();
2592            return self.with_auto_commit(has_mutations, || {
2593                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2594                let planner = self.create_planner_for_store(
2595                    Arc::clone(&active),
2596                    viewing_epoch,
2597                    transaction_id,
2598                );
2599                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2600
2601                let executor = Executor::with_columns(physical_plan.columns.clone())
2602                    .with_deadline(self.query_deadline());
2603                let _result = executor.execute(physical_plan.operator.as_mut())?;
2604
2605                let total_time_ms;
2606                #[cfg(not(target_arch = "wasm32"))]
2607                {
2608                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2609                }
2610                #[cfg(target_arch = "wasm32")]
2611                {
2612                    total_time_ms = 0.0;
2613                }
2614
2615                let profile_tree = crate::query::profile::build_profile_tree(
2616                    &optimized_plan.root,
2617                    &mut entries.into_iter(),
2618                );
2619                Ok(crate::query::profile::profile_result(
2620                    &profile_tree,
2621                    total_time_ms,
2622                ))
2623            });
2624        }
2625
2626        let has_mutations = optimized_plan.root.has_mutations();
2627
2628        let result = self.with_auto_commit(has_mutations, || {
2629            // Get transaction context for MVCC visibility
2630            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2631
2632            // Convert to physical plan with transaction context
2633            let planner =
2634                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2635            let mut physical_plan = planner.plan(&optimized_plan)?;
2636
2637            // Execute the plan
2638            let executor = Executor::with_columns(physical_plan.columns.clone())
2639                .with_deadline(self.query_deadline());
2640            executor.execute(physical_plan.operator.as_mut())
2641        });
2642
2643        #[cfg(feature = "metrics")]
2644        {
2645            #[cfg(not(target_arch = "wasm32"))]
2646            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2647            #[cfg(target_arch = "wasm32")]
2648            let elapsed_ms = None;
2649            self.record_query_metrics("cypher", elapsed_ms, &result);
2650        }
2651
2652        result
2653    }
2654
2655    /// Executes a Gremlin query.
2656    ///
2657    /// # Errors
2658    ///
2659    /// Returns an error if the query fails to parse or execute.
2660    ///
2661    /// # Examples
2662    ///
2663    /// ```no_run
2664    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2665    /// use grafeo_engine::GrafeoDB;
2666    ///
2667    /// let db = GrafeoDB::new_in_memory();
2668    /// let session = db.session();
2669    ///
2670    /// // Create some nodes first
2671    /// session.create_node(&["Person"]);
2672    ///
2673    /// // Query using Gremlin
2674    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2675    /// # Ok(())
2676    /// # }
2677    /// ```
2678    #[cfg(feature = "gremlin")]
2679    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2680        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2681
2682        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2683        let start_time = Instant::now();
2684
2685        // Parse and translate the query to a logical plan
2686        let logical_plan = gremlin::translate(query)?;
2687
2688        // Semantic validation
2689        let mut binder = Binder::new();
2690        let _binding_context = binder.bind(&logical_plan)?;
2691
2692        // Optimize the plan
2693        let active = self.active_store();
2694        let optimizer = Optimizer::from_graph_store(&*active);
2695        let optimized_plan = optimizer.optimize(logical_plan)?;
2696
2697        let has_mutations = optimized_plan.root.has_mutations();
2698
2699        let result = self.with_auto_commit(has_mutations, || {
2700            // Get transaction context for MVCC visibility
2701            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2702
2703            // Convert to physical plan with transaction context
2704            let planner =
2705                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2706            let mut physical_plan = planner.plan(&optimized_plan)?;
2707
2708            // Execute the plan
2709            let executor = Executor::with_columns(physical_plan.columns.clone())
2710                .with_deadline(self.query_deadline());
2711            executor.execute(physical_plan.operator.as_mut())
2712        });
2713
2714        #[cfg(feature = "metrics")]
2715        {
2716            #[cfg(not(target_arch = "wasm32"))]
2717            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2718            #[cfg(target_arch = "wasm32")]
2719            let elapsed_ms = None;
2720            self.record_query_metrics("gremlin", elapsed_ms, &result);
2721        }
2722
2723        result
2724    }
2725
2726    /// Executes a Gremlin query with parameters.
2727    ///
2728    /// # Errors
2729    ///
2730    /// Returns an error if the query fails to parse or execute.
2731    #[cfg(feature = "gremlin")]
2732    pub fn execute_gremlin_with_params(
2733        &self,
2734        query: &str,
2735        params: std::collections::HashMap<String, Value>,
2736    ) -> Result<QueryResult> {
2737        use crate::query::processor::{QueryLanguage, QueryProcessor};
2738
2739        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2740        let start_time = Instant::now();
2741
2742        let has_mutations = Self::query_looks_like_mutation(query);
2743        let active = self.active_store();
2744
2745        let result = self.with_auto_commit(has_mutations, || {
2746            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2747            let processor = QueryProcessor::for_stores_with_transaction(
2748                Arc::clone(&active),
2749                self.active_write_store(),
2750                Arc::clone(&self.transaction_manager),
2751            )?;
2752            let processor = if let Some(transaction_id) = transaction_id {
2753                processor.with_transaction_context(viewing_epoch, transaction_id)
2754            } else {
2755                processor
2756            };
2757            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2758        });
2759
2760        #[cfg(feature = "metrics")]
2761        {
2762            #[cfg(not(target_arch = "wasm32"))]
2763            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2764            #[cfg(target_arch = "wasm32")]
2765            let elapsed_ms = None;
2766            self.record_query_metrics("gremlin", elapsed_ms, &result);
2767        }
2768
2769        result
2770    }
2771
2772    /// Executes a GraphQL query against the LPG store.
2773    ///
2774    /// # Errors
2775    ///
2776    /// Returns an error if the query fails to parse or execute.
2777    ///
2778    /// # Examples
2779    ///
2780    /// ```no_run
2781    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2782    /// use grafeo_engine::GrafeoDB;
2783    ///
2784    /// let db = GrafeoDB::new_in_memory();
2785    /// let session = db.session();
2786    ///
2787    /// // Create some nodes first
2788    /// session.create_node(&["User"]);
2789    ///
2790    /// // Query using GraphQL
2791    /// let result = session.execute_graphql("query { user { id name } }")?;
2792    /// # Ok(())
2793    /// # }
2794    /// ```
2795    #[cfg(feature = "graphql")]
2796    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2797        use crate::query::{
2798            Executor, binder::Binder, optimizer::Optimizer, processor::substitute_params,
2799            translators::graphql,
2800        };
2801
2802        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2803        let start_time = Instant::now();
2804
2805        let mut logical_plan = graphql::translate(query)?;
2806
2807        // Substitute default parameter values from variable declarations
2808        if !logical_plan.default_params.is_empty() {
2809            let defaults = logical_plan.default_params.clone();
2810            substitute_params(&mut logical_plan, &defaults)?;
2811        }
2812
2813        let mut binder = Binder::new();
2814        let _binding_context = binder.bind(&logical_plan)?;
2815
2816        let active = self.active_store();
2817        let optimizer = Optimizer::from_graph_store(&*active);
2818        let optimized_plan = optimizer.optimize(logical_plan)?;
2819        let has_mutations = optimized_plan.root.has_mutations();
2820
2821        let result = self.with_auto_commit(has_mutations, || {
2822            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2823            let planner =
2824                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2825            let mut physical_plan = planner.plan(&optimized_plan)?;
2826            let executor = Executor::with_columns(physical_plan.columns.clone())
2827                .with_deadline(self.query_deadline());
2828            executor.execute(physical_plan.operator.as_mut())
2829        });
2830
2831        #[cfg(feature = "metrics")]
2832        {
2833            #[cfg(not(target_arch = "wasm32"))]
2834            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2835            #[cfg(target_arch = "wasm32")]
2836            let elapsed_ms = None;
2837            self.record_query_metrics("graphql", elapsed_ms, &result);
2838        }
2839
2840        result
2841    }
2842
2843    /// Executes a GraphQL query with parameters.
2844    ///
2845    /// # Errors
2846    ///
2847    /// Returns an error if the query fails to parse or execute.
2848    #[cfg(feature = "graphql")]
2849    pub fn execute_graphql_with_params(
2850        &self,
2851        query: &str,
2852        params: std::collections::HashMap<String, Value>,
2853    ) -> Result<QueryResult> {
2854        use crate::query::processor::{QueryLanguage, QueryProcessor};
2855
2856        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2857        let start_time = Instant::now();
2858
2859        let has_mutations = Self::query_looks_like_mutation(query);
2860        let active = self.active_store();
2861
2862        let result = self.with_auto_commit(has_mutations, || {
2863            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2864            let processor = QueryProcessor::for_stores_with_transaction(
2865                Arc::clone(&active),
2866                self.active_write_store(),
2867                Arc::clone(&self.transaction_manager),
2868            )?;
2869            let processor = if let Some(transaction_id) = transaction_id {
2870                processor.with_transaction_context(viewing_epoch, transaction_id)
2871            } else {
2872                processor
2873            };
2874            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2875        });
2876
2877        #[cfg(feature = "metrics")]
2878        {
2879            #[cfg(not(target_arch = "wasm32"))]
2880            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2881            #[cfg(target_arch = "wasm32")]
2882            let elapsed_ms = None;
2883            self.record_query_metrics("graphql", elapsed_ms, &result);
2884        }
2885
2886        result
2887    }
2888
2889    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2890    ///
2891    /// # Errors
2892    ///
2893    /// Returns an error if the query fails to parse or execute.
2894    ///
2895    /// # Examples
2896    ///
2897    /// ```no_run
2898    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2899    /// use grafeo_engine::GrafeoDB;
2900    ///
2901    /// let db = GrafeoDB::new_in_memory();
2902    /// let session = db.session();
2903    ///
2904    /// let result = session.execute_sql(
2905    ///     "SELECT * FROM GRAPH_TABLE (
2906    ///         MATCH (n:Person)
2907    ///         COLUMNS (n.name AS name)
2908    ///     )"
2909    /// )?;
2910    /// # Ok(())
2911    /// # }
2912    /// ```
2913    #[cfg(feature = "sql-pgq")]
2914    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2915        use crate::query::{
2916            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2917            processor::QueryLanguage, translators::sql_pgq,
2918        };
2919
2920        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2921        let start_time = Instant::now();
2922
2923        // Parse and translate (always needed to check for DDL)
2924        let logical_plan = sql_pgq::translate(query)?;
2925
2926        // Handle DDL statements directly (they don't go through the query pipeline)
2927        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2928            return Ok(QueryResult {
2929                columns: vec!["status".into()],
2930                column_types: vec![grafeo_common::types::LogicalType::String],
2931                rows: vec![vec![Value::from(format!(
2932                    "Property graph '{}' created ({} node tables, {} edge tables)",
2933                    cpg.name,
2934                    cpg.node_tables.len(),
2935                    cpg.edge_tables.len()
2936                ))]],
2937                execution_time_ms: None,
2938                rows_scanned: None,
2939                status_message: None,
2940                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2941            });
2942        }
2943
2944        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2945
2946        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2947            cached_plan
2948        } else {
2949            let mut binder = Binder::new();
2950            let _binding_context = binder.bind(&logical_plan)?;
2951            let active = self.active_store();
2952            let optimizer = Optimizer::from_graph_store(&*active);
2953            let plan = optimizer.optimize(logical_plan)?;
2954            self.query_cache.put_optimized(cache_key, plan.clone());
2955            plan
2956        };
2957
2958        let active = self.active_store();
2959        let has_mutations = optimized_plan.root.has_mutations();
2960
2961        let result = self.with_auto_commit(has_mutations, || {
2962            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2963            let planner =
2964                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2965            let mut physical_plan = planner.plan(&optimized_plan)?;
2966            let executor = Executor::with_columns(physical_plan.columns.clone())
2967                .with_deadline(self.query_deadline());
2968            executor.execute(physical_plan.operator.as_mut())
2969        });
2970
2971        #[cfg(feature = "metrics")]
2972        {
2973            #[cfg(not(target_arch = "wasm32"))]
2974            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2975            #[cfg(target_arch = "wasm32")]
2976            let elapsed_ms = None;
2977            self.record_query_metrics("sql", elapsed_ms, &result);
2978        }
2979
2980        result
2981    }
2982
2983    /// Executes a SQL/PGQ query with parameters.
2984    ///
2985    /// # Errors
2986    ///
2987    /// Returns an error if the query fails to parse or execute.
2988    #[cfg(feature = "sql-pgq")]
2989    pub fn execute_sql_with_params(
2990        &self,
2991        query: &str,
2992        params: std::collections::HashMap<String, Value>,
2993    ) -> Result<QueryResult> {
2994        use crate::query::processor::{QueryLanguage, QueryProcessor};
2995
2996        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2997        let start_time = Instant::now();
2998
2999        let has_mutations = Self::query_looks_like_mutation(query);
3000        let active = self.active_store();
3001
3002        let result = self.with_auto_commit(has_mutations, || {
3003            let (viewing_epoch, transaction_id) = self.get_transaction_context();
3004            let processor = QueryProcessor::for_stores_with_transaction(
3005                Arc::clone(&active),
3006                self.active_write_store(),
3007                Arc::clone(&self.transaction_manager),
3008            )?;
3009            let processor = if let Some(transaction_id) = transaction_id {
3010                processor.with_transaction_context(viewing_epoch, transaction_id)
3011            } else {
3012                processor
3013            };
3014            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
3015        });
3016
3017        #[cfg(feature = "metrics")]
3018        {
3019            #[cfg(not(target_arch = "wasm32"))]
3020            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3021            #[cfg(target_arch = "wasm32")]
3022            let elapsed_ms = None;
3023            self.record_query_metrics("sql", elapsed_ms, &result);
3024        }
3025
3026        result
3027    }
3028
3029    /// Executes a query in the specified language by name.
3030    ///
3031    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
3032    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
3033    ///
3034    /// # Errors
3035    ///
3036    /// Returns an error if the language is unknown/disabled or the query fails.
3037    pub fn execute_language(
3038        &self,
3039        query: &str,
3040        language: &str,
3041        params: Option<std::collections::HashMap<String, Value>>,
3042    ) -> Result<QueryResult> {
3043        let _span = grafeo_info_span!(
3044            "grafeo::session::execute",
3045            language,
3046            query_len = query.len(),
3047        );
3048        match language {
3049            "gql" => {
3050                if let Some(p) = params {
3051                    self.execute_with_params(query, p)
3052                } else {
3053                    self.execute(query)
3054                }
3055            }
3056            #[cfg(feature = "cypher")]
3057            "cypher" => {
3058                if let Some(p) = params {
3059                    use crate::query::processor::{QueryLanguage, QueryProcessor};
3060
3061                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
3062                    let start_time = Instant::now();
3063
3064                    let has_mutations = Self::query_looks_like_mutation(query);
3065                    let active = self.active_store();
3066                    let result = self.with_auto_commit(has_mutations, || {
3067                        let processor = QueryProcessor::for_stores_with_transaction(
3068                            Arc::clone(&active),
3069                            self.active_write_store(),
3070                            Arc::clone(&self.transaction_manager),
3071                        )?;
3072                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3073                        let processor = if let Some(transaction_id) = transaction_id {
3074                            processor.with_transaction_context(viewing_epoch, transaction_id)
3075                        } else {
3076                            processor
3077                        };
3078                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3079                    });
3080
3081                    #[cfg(feature = "metrics")]
3082                    {
3083                        #[cfg(not(target_arch = "wasm32"))]
3084                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3085                        #[cfg(target_arch = "wasm32")]
3086                        let elapsed_ms = None;
3087                        self.record_query_metrics("cypher", elapsed_ms, &result);
3088                    }
3089
3090                    result
3091                } else {
3092                    self.execute_cypher(query)
3093                }
3094            }
3095            #[cfg(feature = "gremlin")]
3096            "gremlin" => {
3097                if let Some(p) = params {
3098                    self.execute_gremlin_with_params(query, p)
3099                } else {
3100                    self.execute_gremlin(query)
3101                }
3102            }
3103            #[cfg(feature = "graphql")]
3104            "graphql" => {
3105                if let Some(p) = params {
3106                    self.execute_graphql_with_params(query, p)
3107                } else {
3108                    self.execute_graphql(query)
3109                }
3110            }
3111            #[cfg(all(feature = "graphql", feature = "rdf"))]
3112            "graphql-rdf" => {
3113                if let Some(p) = params {
3114                    self.execute_graphql_rdf_with_params(query, p)
3115                } else {
3116                    self.execute_graphql_rdf(query)
3117                }
3118            }
3119            #[cfg(feature = "sql-pgq")]
3120            "sql" | "sql-pgq" => {
3121                if let Some(p) = params {
3122                    self.execute_sql_with_params(query, p)
3123                } else {
3124                    self.execute_sql(query)
3125                }
3126            }
3127            #[cfg(all(feature = "sparql", feature = "rdf"))]
3128            "sparql" => {
3129                if let Some(p) = params {
3130                    self.execute_sparql_with_params(query, p)
3131                } else {
3132                    self.execute_sparql(query)
3133                }
3134            }
3135            other => Err(grafeo_common::utils::error::Error::Query(
3136                grafeo_common::utils::error::QueryError::new(
3137                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3138                    format!("Unknown query language: '{other}'"),
3139                ),
3140            )),
3141        }
3142    }
3143
3144    /// Begins a new transaction.
3145    ///
3146    /// # Errors
3147    ///
3148    /// Returns an error if a transaction is already active.
3149    ///
3150    /// # Examples
3151    ///
3152    /// ```no_run
3153    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3154    /// use grafeo_engine::GrafeoDB;
3155    ///
3156    /// let db = GrafeoDB::new_in_memory();
3157    /// let mut session = db.session();
3158    ///
3159    /// session.begin_transaction()?;
3160    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3161    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3162    /// session.commit()?; // Both inserts committed atomically
3163    /// # Ok(())
3164    /// # }
3165    /// ```
3166    /// Clears all cached query plans.
3167    ///
3168    /// The plan cache is shared across all sessions on the same database,
3169    /// so clearing from one session affects all sessions.
3170    pub fn clear_plan_cache(&self) {
3171        self.query_cache.clear();
3172    }
3173
3174    /// Begins a new transaction on this session.
3175    ///
3176    /// Uses the default isolation level (`SnapshotIsolation`).
3177    ///
3178    /// # Errors
3179    ///
3180    /// Returns an error if a transaction is already active.
3181    pub fn begin_transaction(&mut self) -> Result<()> {
3182        self.begin_transaction_inner(false, None)
3183    }
3184
3185    /// Begins a transaction with a specific isolation level.
3186    ///
3187    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3188    ///
3189    /// # Errors
3190    ///
3191    /// Returns an error if a transaction is already active.
3192    pub fn begin_transaction_with_isolation(
3193        &mut self,
3194        isolation_level: crate::transaction::IsolationLevel,
3195    ) -> Result<()> {
3196        self.begin_transaction_inner(false, Some(isolation_level))
3197    }
3198
3199    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3200    fn begin_transaction_inner(
3201        &self,
3202        read_only: bool,
3203        isolation_level: Option<crate::transaction::IsolationLevel>,
3204    ) -> Result<()> {
3205        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3206        let mut current = self.current_transaction.lock();
3207        if current.is_some() {
3208            // Nested transaction: create an auto-savepoint instead of a new tx.
3209            drop(current);
3210            let mut depth = self.transaction_nesting_depth.lock();
3211            *depth += 1;
3212            let sp_name = format!("_nested_tx_{}", *depth);
3213            self.savepoint(&sp_name)?;
3214            return Ok(());
3215        }
3216
3217        let active = self.active_lpg_store();
3218        self.transaction_start_node_count
3219            .store(active.node_count(), Ordering::Relaxed);
3220        self.transaction_start_edge_count
3221            .store(active.edge_count(), Ordering::Relaxed);
3222        let transaction_id = if let Some(level) = isolation_level {
3223            self.transaction_manager.begin_with_isolation(level)
3224        } else {
3225            self.transaction_manager.begin()
3226        };
3227        *current = Some(transaction_id);
3228        *self.read_only_tx.lock() = read_only || self.db_read_only;
3229
3230        // Record the initial graph as "touched" for cross-graph atomicity.
3231        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3232        let key = self.active_graph_storage_key();
3233        let mut touched = self.touched_graphs.lock();
3234        touched.clear();
3235        touched.push(key);
3236
3237        #[cfg(feature = "metrics")]
3238        {
3239            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3240            #[cfg(not(target_arch = "wasm32"))]
3241            {
3242                *self.tx_start_time.lock() = Some(Instant::now());
3243            }
3244        }
3245
3246        Ok(())
3247    }
3248
3249    /// Commits the current transaction.
3250    ///
3251    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3252    ///
3253    /// # Errors
3254    ///
3255    /// Returns an error if no transaction is active.
3256    pub fn commit(&mut self) -> Result<()> {
3257        self.commit_inner()
3258    }
3259
3260    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3261    fn commit_inner(&self) -> Result<()> {
3262        let _span = grafeo_debug_span!("grafeo::tx::commit");
3263        // Nested transaction: release the auto-savepoint (changes are preserved).
3264        {
3265            let mut depth = self.transaction_nesting_depth.lock();
3266            if *depth > 0 {
3267                let sp_name = format!("_nested_tx_{depth}");
3268                *depth -= 1;
3269                drop(depth);
3270                return self.release_savepoint(&sp_name);
3271            }
3272        }
3273
3274        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3275            grafeo_common::utils::error::Error::Transaction(
3276                grafeo_common::utils::error::TransactionError::InvalidState(
3277                    "No active transaction".to_string(),
3278                ),
3279            )
3280        })?;
3281
3282        // Validate the transaction first (conflict detection) before committing data.
3283        // If this fails, we rollback the data changes instead of making them permanent.
3284        let touched = self.touched_graphs.lock().clone();
3285        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3286            Ok(epoch) => epoch,
3287            Err(e) => {
3288                // Conflict detected: rollback the data changes
3289                for graph_name in &touched {
3290                    let store = self.resolve_store(graph_name);
3291                    store.rollback_transaction_properties(transaction_id);
3292                }
3293                #[cfg(feature = "rdf")]
3294                self.rollback_rdf_transaction(transaction_id);
3295                *self.read_only_tx.lock() = self.db_read_only;
3296                self.savepoints.lock().clear();
3297                self.touched_graphs.lock().clear();
3298                #[cfg(feature = "metrics")]
3299                {
3300                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3301                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3302                    #[cfg(not(target_arch = "wasm32"))]
3303                    if let Some(start) = self.tx_start_time.lock().take() {
3304                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3305                        crate::metrics::record_metric!(
3306                            self.metrics,
3307                            tx_duration,
3308                            observe duration_ms
3309                        );
3310                    }
3311                }
3312                return Err(e);
3313            }
3314        };
3315
3316        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3317        for graph_name in &touched {
3318            let store = self.resolve_store(graph_name);
3319            store.finalize_version_epochs(transaction_id, commit_epoch);
3320        }
3321
3322        // Commit succeeded: discard undo logs (make changes permanent)
3323        #[cfg(feature = "rdf")]
3324        self.commit_rdf_transaction(transaction_id);
3325
3326        for graph_name in &touched {
3327            let store = self.resolve_store(graph_name);
3328            store.commit_transaction_properties(transaction_id);
3329        }
3330
3331        // Sync epoch for all touched graphs so that convenience lookups
3332        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3333        let current_epoch = self.transaction_manager.current_epoch();
3334        for graph_name in &touched {
3335            let store = self.resolve_store(graph_name);
3336            store.sync_epoch(current_epoch);
3337        }
3338
3339        // Reset read-only flag, clear savepoints and touched graphs
3340        *self.read_only_tx.lock() = self.db_read_only;
3341        self.savepoints.lock().clear();
3342        self.touched_graphs.lock().clear();
3343
3344        // Auto-GC: periodically prune old MVCC versions
3345        if self.gc_interval > 0 {
3346            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3347            if count.is_multiple_of(self.gc_interval) {
3348                let min_epoch = self.transaction_manager.min_active_epoch();
3349                for graph_name in &touched {
3350                    let store = self.resolve_store(graph_name);
3351                    store.gc_versions(min_epoch);
3352                }
3353                self.transaction_manager.gc();
3354                #[cfg(feature = "metrics")]
3355                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3356            }
3357        }
3358
3359        #[cfg(feature = "metrics")]
3360        {
3361            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3362            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3363            #[cfg(not(target_arch = "wasm32"))]
3364            if let Some(start) = self.tx_start_time.lock().take() {
3365                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3366                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3367            }
3368        }
3369
3370        Ok(())
3371    }
3372
3373    /// Aborts the current transaction.
3374    ///
3375    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3376    ///
3377    /// # Errors
3378    ///
3379    /// Returns an error if no transaction is active.
3380    ///
3381    /// # Examples
3382    ///
3383    /// ```no_run
3384    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3385    /// use grafeo_engine::GrafeoDB;
3386    ///
3387    /// let db = GrafeoDB::new_in_memory();
3388    /// let mut session = db.session();
3389    ///
3390    /// session.begin_transaction()?;
3391    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3392    /// session.rollback()?; // Insert is discarded
3393    /// # Ok(())
3394    /// # }
3395    /// ```
3396    pub fn rollback(&mut self) -> Result<()> {
3397        self.rollback_inner()
3398    }
3399
3400    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3401    fn rollback_inner(&self) -> Result<()> {
3402        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3403        // Nested transaction: rollback to the auto-savepoint.
3404        {
3405            let mut depth = self.transaction_nesting_depth.lock();
3406            if *depth > 0 {
3407                let sp_name = format!("_nested_tx_{depth}");
3408                *depth -= 1;
3409                drop(depth);
3410                return self.rollback_to_savepoint(&sp_name);
3411            }
3412        }
3413
3414        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3415            grafeo_common::utils::error::Error::Transaction(
3416                grafeo_common::utils::error::TransactionError::InvalidState(
3417                    "No active transaction".to_string(),
3418                ),
3419            )
3420        })?;
3421
3422        // Reset read-only flag
3423        *self.read_only_tx.lock() = self.db_read_only;
3424
3425        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3426        let touched = self.touched_graphs.lock().clone();
3427        for graph_name in &touched {
3428            let store = self.resolve_store(graph_name);
3429            store.discard_uncommitted_versions(transaction_id);
3430        }
3431
3432        // Discard pending operations in the RDF store
3433        #[cfg(feature = "rdf")]
3434        self.rollback_rdf_transaction(transaction_id);
3435
3436        // Clear savepoints and touched graphs
3437        self.savepoints.lock().clear();
3438        self.touched_graphs.lock().clear();
3439
3440        // Mark transaction as aborted in the manager
3441        let result = self.transaction_manager.abort(transaction_id);
3442
3443        #[cfg(feature = "metrics")]
3444        if result.is_ok() {
3445            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3446            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3447            #[cfg(not(target_arch = "wasm32"))]
3448            if let Some(start) = self.tx_start_time.lock().take() {
3449                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3450                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3451            }
3452        }
3453
3454        result
3455    }
3456
3457    /// Creates a named savepoint within the current transaction.
3458    ///
3459    /// The savepoint captures the current node/edge ID counters so that
3460    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3461    /// entities created after this point.
3462    ///
3463    /// # Errors
3464    ///
3465    /// Returns an error if no transaction is active.
3466    pub fn savepoint(&self, name: &str) -> Result<()> {
3467        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3468            grafeo_common::utils::error::Error::Transaction(
3469                grafeo_common::utils::error::TransactionError::InvalidState(
3470                    "No active transaction".to_string(),
3471                ),
3472            )
3473        })?;
3474
3475        // Capture state for every graph touched so far.
3476        let touched = self.touched_graphs.lock().clone();
3477        let graph_snapshots: Vec<GraphSavepoint> = touched
3478            .iter()
3479            .map(|graph_name| {
3480                let store = self.resolve_store(graph_name);
3481                GraphSavepoint {
3482                    graph_name: graph_name.clone(),
3483                    next_node_id: store.peek_next_node_id(),
3484                    next_edge_id: store.peek_next_edge_id(),
3485                    undo_log_position: store.property_undo_log_position(tx_id),
3486                }
3487            })
3488            .collect();
3489
3490        self.savepoints.lock().push(SavepointState {
3491            name: name.to_string(),
3492            graph_snapshots,
3493            active_graph: self.current_graph.lock().clone(),
3494        });
3495        Ok(())
3496    }
3497
3498    /// Rolls back to a named savepoint, undoing all writes made after it.
3499    ///
3500    /// The savepoint and any savepoints created after it are removed.
3501    /// Entities with IDs >= the savepoint snapshot are discarded.
3502    ///
3503    /// # Errors
3504    ///
3505    /// Returns an error if no transaction is active or the savepoint does not exist.
3506    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3507        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3508            grafeo_common::utils::error::Error::Transaction(
3509                grafeo_common::utils::error::TransactionError::InvalidState(
3510                    "No active transaction".to_string(),
3511                ),
3512            )
3513        })?;
3514
3515        let mut savepoints = self.savepoints.lock();
3516
3517        // Find the savepoint by name (search from the end for nested savepoints)
3518        let pos = savepoints
3519            .iter()
3520            .rposition(|sp| sp.name == name)
3521            .ok_or_else(|| {
3522                grafeo_common::utils::error::Error::Transaction(
3523                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3524                        "Savepoint '{name}' not found"
3525                    )),
3526                )
3527            })?;
3528
3529        let sp_state = savepoints[pos].clone();
3530
3531        // Remove this savepoint and all later ones
3532        savepoints.truncate(pos);
3533        drop(savepoints);
3534
3535        // Roll back each graph that was captured in the savepoint.
3536        for gs in &sp_state.graph_snapshots {
3537            let store = self.resolve_store(&gs.graph_name);
3538
3539            // Replay property/label undo entries recorded after the savepoint
3540            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3541
3542            // Discard entities created after the savepoint
3543            let current_next_node = store.peek_next_node_id();
3544            let current_next_edge = store.peek_next_edge_id();
3545
3546            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3547                .map(NodeId::new)
3548                .collect();
3549            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3550                .map(EdgeId::new)
3551                .collect();
3552
3553            if !node_ids.is_empty() || !edge_ids.is_empty() {
3554                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3555            }
3556        }
3557
3558        // Also roll back any graphs that were touched AFTER the savepoint
3559        // but not captured in it. These need full discard since the savepoint
3560        // didn't include them.
3561        let touched = self.touched_graphs.lock().clone();
3562        for graph_name in &touched {
3563            let already_captured = sp_state
3564                .graph_snapshots
3565                .iter()
3566                .any(|gs| gs.graph_name == *graph_name);
3567            if !already_captured {
3568                let store = self.resolve_store(graph_name);
3569                store.discard_uncommitted_versions(transaction_id);
3570            }
3571        }
3572
3573        // Restore touched_graphs to only the graphs that were known at savepoint time.
3574        let mut touched = self.touched_graphs.lock();
3575        touched.clear();
3576        for gs in &sp_state.graph_snapshots {
3577            if !touched.contains(&gs.graph_name) {
3578                touched.push(gs.graph_name.clone());
3579            }
3580        }
3581
3582        Ok(())
3583    }
3584
3585    /// Releases (removes) a named savepoint without rolling back.
3586    ///
3587    /// # Errors
3588    ///
3589    /// Returns an error if no transaction is active or the savepoint does not exist.
3590    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3591        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3592            grafeo_common::utils::error::Error::Transaction(
3593                grafeo_common::utils::error::TransactionError::InvalidState(
3594                    "No active transaction".to_string(),
3595                ),
3596            )
3597        })?;
3598
3599        let mut savepoints = self.savepoints.lock();
3600        let pos = savepoints
3601            .iter()
3602            .rposition(|sp| sp.name == name)
3603            .ok_or_else(|| {
3604                grafeo_common::utils::error::Error::Transaction(
3605                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3606                        "Savepoint '{name}' not found"
3607                    )),
3608                )
3609            })?;
3610        savepoints.remove(pos);
3611        Ok(())
3612    }
3613
3614    /// Returns whether a transaction is active.
3615    #[must_use]
3616    pub fn in_transaction(&self) -> bool {
3617        self.current_transaction.lock().is_some()
3618    }
3619
3620    /// Returns the current transaction ID, if any.
3621    #[must_use]
3622    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3623        *self.current_transaction.lock()
3624    }
3625
3626    /// Returns a reference to the transaction manager.
3627    #[must_use]
3628    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3629        &self.transaction_manager
3630    }
3631
3632    /// Returns the store's current node count and the count at transaction start.
3633    #[must_use]
3634    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3635        (
3636            self.transaction_start_node_count.load(Ordering::Relaxed),
3637            self.active_lpg_store().node_count(),
3638        )
3639    }
3640
3641    /// Returns the store's current edge count and the count at transaction start.
3642    #[must_use]
3643    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3644        (
3645            self.transaction_start_edge_count.load(Ordering::Relaxed),
3646            self.active_lpg_store().edge_count(),
3647        )
3648    }
3649
3650    /// Prepares the current transaction for a two-phase commit.
3651    ///
3652    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3653    /// lets you inspect pending changes and attach metadata before finalizing.
3654    /// The mutable borrow prevents concurrent operations while the commit is
3655    /// pending.
3656    ///
3657    /// If the `PreparedCommit` is dropped without calling `commit()` or
3658    /// `abort()`, the transaction is automatically rolled back.
3659    ///
3660    /// # Errors
3661    ///
3662    /// Returns an error if no transaction is active.
3663    ///
3664    /// # Examples
3665    ///
3666    /// ```no_run
3667    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3668    /// use grafeo_engine::GrafeoDB;
3669    ///
3670    /// let db = GrafeoDB::new_in_memory();
3671    /// let mut session = db.session();
3672    ///
3673    /// session.begin_transaction()?;
3674    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3675    ///
3676    /// let mut prepared = session.prepare_commit()?;
3677    /// println!("Nodes written: {}", prepared.info().nodes_written);
3678    /// prepared.set_metadata("audit_user", "admin");
3679    /// prepared.commit()?;
3680    /// # Ok(())
3681    /// # }
3682    /// ```
3683    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3684        crate::transaction::PreparedCommit::new(self)
3685    }
3686
3687    /// Sets auto-commit mode.
3688    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3689        self.auto_commit = auto_commit;
3690    }
3691
3692    /// Returns whether auto-commit is enabled.
3693    #[must_use]
3694    pub fn auto_commit(&self) -> bool {
3695        self.auto_commit
3696    }
3697
3698    /// Returns `true` if auto-commit should wrap this execution.
3699    ///
3700    /// Auto-commit kicks in when: the session is in auto-commit mode,
3701    /// no explicit transaction is active, and the query mutates data.
3702    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3703        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3704    }
3705
3706    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3707    /// returns `true`. On error the transaction is rolled back.
3708    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3709    where
3710        F: FnOnce() -> Result<QueryResult>,
3711    {
3712        if self.needs_auto_commit(has_mutations) {
3713            self.begin_transaction_inner(false, None)?;
3714            match body() {
3715                Ok(result) => {
3716                    self.commit_inner()?;
3717                    Ok(result)
3718                }
3719                Err(e) => {
3720                    let _ = self.rollback_inner();
3721                    Err(e)
3722                }
3723            }
3724        } else {
3725            body()
3726        }
3727    }
3728
3729    /// Quick heuristic: returns `true` when the query text looks like it
3730    /// performs a mutation. Used by `_with_params` paths that go through the
3731    /// `QueryProcessor` (where the logical plan isn't available before
3732    /// execution). False negatives are harmless: the data just won't be
3733    /// auto-committed, which matches the prior behaviour.
3734    fn query_looks_like_mutation(query: &str) -> bool {
3735        let upper = query.to_ascii_uppercase();
3736        upper.contains("INSERT")
3737            || upper.contains("CREATE")
3738            || upper.contains("DELETE")
3739            || upper.contains("MERGE")
3740            || upper.contains("SET")
3741            || upper.contains("REMOVE")
3742            || upper.contains("DROP")
3743            || upper.contains("ALTER")
3744    }
3745
3746    /// Computes the wall-clock deadline for query execution.
3747    #[must_use]
3748    fn query_deadline(&self) -> Option<Instant> {
3749        #[cfg(not(target_arch = "wasm32"))]
3750        {
3751            self.query_timeout.map(|d| Instant::now() + d)
3752        }
3753        #[cfg(target_arch = "wasm32")]
3754        {
3755            let _ = &self.query_timeout;
3756            None
3757        }
3758    }
3759
3760    /// Records query metrics for any language.
3761    ///
3762    /// Called after query execution to update counters, latency histogram,
3763    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3764    /// where `Instant` is unavailable.
3765    #[cfg(feature = "metrics")]
3766    fn record_query_metrics(
3767        &self,
3768        language: &str,
3769        elapsed_ms: Option<f64>,
3770        result: &Result<crate::database::QueryResult>,
3771    ) {
3772        use crate::metrics::record_metric;
3773
3774        record_metric!(self.metrics, query_count, inc);
3775        if let Some(ref reg) = self.metrics {
3776            reg.query_count_by_language.increment(language);
3777        }
3778        if let Some(ms) = elapsed_ms {
3779            record_metric!(self.metrics, query_latency, observe ms);
3780        }
3781        match result {
3782            Ok(r) => {
3783                let returned = r.rows.len() as u64;
3784                record_metric!(self.metrics, rows_returned, add returned);
3785                if let Some(scanned) = r.rows_scanned {
3786                    record_metric!(self.metrics, rows_scanned, add scanned);
3787                }
3788            }
3789            Err(e) => {
3790                record_metric!(self.metrics, query_errors, inc);
3791                // Detect timeout errors
3792                let msg = e.to_string();
3793                if msg.contains("exceeded timeout") {
3794                    record_metric!(self.metrics, query_timeouts, inc);
3795                }
3796            }
3797        }
3798    }
3799
3800    /// Evaluates a simple integer literal from a session parameter expression.
3801    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3802        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3803        match expr {
3804            Expression::Literal(Literal::Integer(n)) => Some(*n),
3805            _ => None,
3806        }
3807    }
3808
3809    /// Returns the current transaction context for MVCC visibility.
3810    ///
3811    /// Returns `(viewing_epoch, transaction_id)` where:
3812    /// - `viewing_epoch` is the epoch at which to check version visibility
3813    /// - `transaction_id` is the current transaction ID (if in a transaction)
3814    #[must_use]
3815    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3816        // Time-travel override takes precedence (read-only, no tx context)
3817        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3818            return (epoch, None);
3819        }
3820
3821        if let Some(transaction_id) = *self.current_transaction.lock() {
3822            // In a transaction: use the transaction's start epoch
3823            let epoch = self
3824                .transaction_manager
3825                .start_epoch(transaction_id)
3826                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3827            (epoch, Some(transaction_id))
3828        } else {
3829            // No transaction: use current epoch
3830            (self.transaction_manager.current_epoch(), None)
3831        }
3832    }
3833
3834    /// Creates a planner with transaction context and constraint validator.
3835    ///
3836    /// The `store` parameter is the graph store to plan against (use
3837    /// `self.active_store()` for graph-aware execution).
3838    fn create_planner_for_store(
3839        &self,
3840        store: Arc<dyn GraphStore>,
3841        viewing_epoch: EpochId,
3842        transaction_id: Option<TransactionId>,
3843    ) -> crate::query::Planner {
3844        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3845    }
3846
3847    fn create_planner_for_store_with_read_only(
3848        &self,
3849        store: Arc<dyn GraphStore>,
3850        viewing_epoch: EpochId,
3851        transaction_id: Option<TransactionId>,
3852        read_only: bool,
3853    ) -> crate::query::Planner {
3854        use crate::query::Planner;
3855        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3856
3857        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3858        let info_store = Arc::clone(&store);
3859        let schema_store = Arc::clone(&store);
3860
3861        let session_context = SessionContext {
3862            current_schema: self.current_schema(),
3863            current_graph: self.current_graph(),
3864            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3865            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3866        };
3867
3868        let write_store = self.active_write_store();
3869
3870        let mut planner = Planner::with_context(
3871            Arc::clone(&store),
3872            write_store,
3873            Arc::clone(&self.transaction_manager),
3874            transaction_id,
3875            viewing_epoch,
3876        )
3877        .with_factorized_execution(self.factorized_execution)
3878        .with_catalog(Arc::clone(&self.catalog))
3879        .with_session_context(session_context)
3880        .with_read_only(read_only);
3881
3882        // Attach the constraint validator for schema enforcement
3883        let validator =
3884            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3885        planner = planner.with_validator(Arc::new(validator));
3886
3887        planner
3888    }
3889
3890    /// Builds a `Value::Map` for the `info()` introspection function.
3891    fn build_info_value(store: &dyn GraphStore) -> Value {
3892        use grafeo_common::types::PropertyKey;
3893        use std::collections::BTreeMap;
3894
3895        let mut map = BTreeMap::new();
3896        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3897        map.insert(
3898            PropertyKey::from("node_count"),
3899            Value::Int64(store.node_count() as i64),
3900        );
3901        map.insert(
3902            PropertyKey::from("edge_count"),
3903            Value::Int64(store.edge_count() as i64),
3904        );
3905        map.insert(
3906            PropertyKey::from("version"),
3907            Value::String(env!("CARGO_PKG_VERSION").into()),
3908        );
3909        Value::Map(map.into())
3910    }
3911
3912    /// Builds a `Value::Map` for the `schema()` introspection function.
3913    fn build_schema_value(store: &dyn GraphStore) -> Value {
3914        use grafeo_common::types::PropertyKey;
3915        use std::collections::BTreeMap;
3916
3917        let labels: Vec<Value> = store
3918            .all_labels()
3919            .into_iter()
3920            .map(|l| Value::String(l.into()))
3921            .collect();
3922        let edge_types: Vec<Value> = store
3923            .all_edge_types()
3924            .into_iter()
3925            .map(|t| Value::String(t.into()))
3926            .collect();
3927        let property_keys: Vec<Value> = store
3928            .all_property_keys()
3929            .into_iter()
3930            .map(|k| Value::String(k.into()))
3931            .collect();
3932
3933        let mut map = BTreeMap::new();
3934        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3935        map.insert(
3936            PropertyKey::from("edge_types"),
3937            Value::List(edge_types.into()),
3938        );
3939        map.insert(
3940            PropertyKey::from("property_keys"),
3941            Value::List(property_keys.into()),
3942        );
3943        Value::Map(map.into())
3944    }
3945
3946    /// Creates a node directly (bypassing query execution).
3947    ///
3948    /// This is a low-level API for testing and direct manipulation.
3949    /// If a transaction is active, the node will be versioned with the transaction ID.
3950    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3951        let (epoch, transaction_id) = self.get_transaction_context();
3952        self.active_lpg_store().create_node_versioned(
3953            labels,
3954            epoch,
3955            transaction_id.unwrap_or(TransactionId::SYSTEM),
3956        )
3957    }
3958
3959    /// Creates a node with properties.
3960    ///
3961    /// If a transaction is active, the node will be versioned with the transaction ID.
3962    pub fn create_node_with_props<'a>(
3963        &self,
3964        labels: &[&str],
3965        properties: impl IntoIterator<Item = (&'a str, Value)>,
3966    ) -> NodeId {
3967        let (epoch, transaction_id) = self.get_transaction_context();
3968        self.active_lpg_store().create_node_with_props_versioned(
3969            labels,
3970            properties,
3971            epoch,
3972            transaction_id.unwrap_or(TransactionId::SYSTEM),
3973        )
3974    }
3975
3976    /// Creates an edge between two nodes.
3977    ///
3978    /// This is a low-level API for testing and direct manipulation.
3979    /// If a transaction is active, the edge will be versioned with the transaction ID.
3980    pub fn create_edge(
3981        &self,
3982        src: NodeId,
3983        dst: NodeId,
3984        edge_type: &str,
3985    ) -> grafeo_common::types::EdgeId {
3986        let (epoch, transaction_id) = self.get_transaction_context();
3987        self.active_lpg_store().create_edge_versioned(
3988            src,
3989            dst,
3990            edge_type,
3991            epoch,
3992            transaction_id.unwrap_or(TransactionId::SYSTEM),
3993        )
3994    }
3995
3996    // =========================================================================
3997    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3998    // =========================================================================
3999
4000    /// Gets a node by ID directly, bypassing query planning.
4001    ///
4002    /// This is the fastest way to retrieve a single node when you know its ID.
4003    /// Skips parsing, binding, optimization, and physical planning entirely.
4004    ///
4005    /// # Performance
4006    ///
4007    /// - Time complexity: O(1) average case
4008    /// - No lock contention (uses DashMap internally)
4009    /// - ~20-30x faster than equivalent MATCH query
4010    ///
4011    /// # Example
4012    ///
4013    /// ```no_run
4014    /// # use grafeo_engine::GrafeoDB;
4015    /// # let db = GrafeoDB::new_in_memory();
4016    /// let session = db.session();
4017    /// let node_id = session.create_node(&["Person"]);
4018    ///
4019    /// // Direct lookup - O(1), no query planning
4020    /// let node = session.get_node(node_id);
4021    /// assert!(node.is_some());
4022    /// ```
4023    #[must_use]
4024    pub fn get_node(&self, id: NodeId) -> Option<Node> {
4025        let (epoch, transaction_id) = self.get_transaction_context();
4026        self.active_lpg_store().get_node_versioned(
4027            id,
4028            epoch,
4029            transaction_id.unwrap_or(TransactionId::SYSTEM),
4030        )
4031    }
4032
4033    /// Gets a single property from a node by ID, bypassing query planning.
4034    ///
4035    /// More efficient than `get_node()` when you only need one property,
4036    /// as it avoids loading the full node with all properties.
4037    ///
4038    /// # Performance
4039    ///
4040    /// - Time complexity: O(1) average case
4041    /// - No query planning overhead
4042    ///
4043    /// # Example
4044    ///
4045    /// ```no_run
4046    /// # use grafeo_engine::GrafeoDB;
4047    /// # use grafeo_common::types::Value;
4048    /// # let db = GrafeoDB::new_in_memory();
4049    /// let session = db.session();
4050    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
4051    ///
4052    /// // Direct property access - O(1)
4053    /// let name = session.get_node_property(id, "name");
4054    /// assert_eq!(name, Some(Value::String("Alix".into())));
4055    /// ```
4056    #[must_use]
4057    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
4058        self.get_node(id)
4059            .and_then(|node| node.get_property(key).cloned())
4060    }
4061
4062    /// Gets an edge by ID directly, bypassing query planning.
4063    ///
4064    /// # Performance
4065    ///
4066    /// - Time complexity: O(1) average case
4067    /// - No lock contention
4068    #[must_use]
4069    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
4070        let (epoch, transaction_id) = self.get_transaction_context();
4071        self.active_lpg_store().get_edge_versioned(
4072            id,
4073            epoch,
4074            transaction_id.unwrap_or(TransactionId::SYSTEM),
4075        )
4076    }
4077
4078    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4079    ///
4080    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4081    ///
4082    /// # Performance
4083    ///
4084    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4085    /// - Uses adjacency index for direct access
4086    /// - ~10-20x faster than equivalent MATCH query
4087    ///
4088    /// # Example
4089    ///
4090    /// ```no_run
4091    /// # use grafeo_engine::GrafeoDB;
4092    /// # let db = GrafeoDB::new_in_memory();
4093    /// let session = db.session();
4094    /// let alix = session.create_node(&["Person"]);
4095    /// let gus = session.create_node(&["Person"]);
4096    /// session.create_edge(alix, gus, "KNOWS");
4097    ///
4098    /// // Direct neighbor lookup - O(degree)
4099    /// let neighbors = session.get_neighbors_outgoing(alix);
4100    /// assert_eq!(neighbors.len(), 1);
4101    /// assert_eq!(neighbors[0].0, gus);
4102    /// ```
4103    #[must_use]
4104    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4105        self.active_lpg_store()
4106            .edges_from(node, Direction::Outgoing)
4107            .collect()
4108    }
4109
4110    /// Gets incoming neighbors of a node directly, bypassing query planning.
4111    ///
4112    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4113    ///
4114    /// # Performance
4115    ///
4116    /// - Time complexity: O(degree) where degree is the number of incoming edges
4117    /// - Uses backward adjacency index for direct access
4118    #[must_use]
4119    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4120        self.active_lpg_store()
4121            .edges_from(node, Direction::Incoming)
4122            .collect()
4123    }
4124
4125    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4126    ///
4127    /// # Example
4128    ///
4129    /// ```no_run
4130    /// # use grafeo_engine::GrafeoDB;
4131    /// # let db = GrafeoDB::new_in_memory();
4132    /// # let session = db.session();
4133    /// # let alix = session.create_node(&["Person"]);
4134    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4135    /// ```
4136    #[must_use]
4137    pub fn get_neighbors_outgoing_by_type(
4138        &self,
4139        node: NodeId,
4140        edge_type: &str,
4141    ) -> Vec<(NodeId, EdgeId)> {
4142        self.active_lpg_store()
4143            .edges_from(node, Direction::Outgoing)
4144            .filter(|(_, edge_id)| {
4145                self.get_edge(*edge_id)
4146                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4147            })
4148            .collect()
4149    }
4150
4151    /// Checks if a node exists, bypassing query planning.
4152    ///
4153    /// # Performance
4154    ///
4155    /// - Time complexity: O(1)
4156    /// - Fastest existence check available
4157    #[must_use]
4158    pub fn node_exists(&self, id: NodeId) -> bool {
4159        self.get_node(id).is_some()
4160    }
4161
4162    /// Checks if an edge exists, bypassing query planning.
4163    #[must_use]
4164    pub fn edge_exists(&self, id: EdgeId) -> bool {
4165        self.get_edge(id).is_some()
4166    }
4167
4168    /// Gets the degree (number of edges) of a node.
4169    ///
4170    /// Returns (outgoing_degree, incoming_degree).
4171    #[must_use]
4172    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4173        let active = self.active_lpg_store();
4174        let out = active.out_degree(node);
4175        let in_degree = active.in_degree(node);
4176        (out, in_degree)
4177    }
4178
4179    /// Batch lookup of multiple nodes by ID.
4180    ///
4181    /// More efficient than calling `get_node()` in a loop because it
4182    /// amortizes overhead.
4183    ///
4184    /// # Performance
4185    ///
4186    /// - Time complexity: O(n) where n is the number of IDs
4187    /// - Better cache utilization than individual lookups
4188    #[must_use]
4189    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4190        let (epoch, transaction_id) = self.get_transaction_context();
4191        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4192        let active = self.active_lpg_store();
4193        ids.iter()
4194            .map(|&id| active.get_node_versioned(id, epoch, tx))
4195            .collect()
4196    }
4197
4198    // ── Change Data Capture ─────────────────────────────────────────────
4199
4200    /// Returns the full change history for an entity (node or edge).
4201    #[cfg(feature = "cdc")]
4202    pub fn history(
4203        &self,
4204        entity_id: impl Into<crate::cdc::EntityId>,
4205    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4206        Ok(self.cdc_log.history(entity_id.into()))
4207    }
4208
4209    /// Returns change events for an entity since the given epoch.
4210    #[cfg(feature = "cdc")]
4211    pub fn history_since(
4212        &self,
4213        entity_id: impl Into<crate::cdc::EntityId>,
4214        since_epoch: EpochId,
4215    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4216        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4217    }
4218
4219    /// Returns all change events across all entities in an epoch range.
4220    #[cfg(feature = "cdc")]
4221    pub fn changes_between(
4222        &self,
4223        start_epoch: EpochId,
4224        end_epoch: EpochId,
4225    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4226        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4227    }
4228}
4229
4230impl Drop for Session {
4231    fn drop(&mut self) {
4232        // Auto-rollback any active transaction to prevent leaked MVCC state,
4233        // dangling write locks, and uncommitted versions lingering in the store.
4234        if self.in_transaction() {
4235            let _ = self.rollback_inner();
4236        }
4237
4238        #[cfg(feature = "metrics")]
4239        if let Some(ref reg) = self.metrics {
4240            reg.session_active
4241                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4242        }
4243    }
4244}
4245
4246#[cfg(test)]
4247mod tests {
4248    use super::parse_default_literal;
4249    use crate::database::GrafeoDB;
4250    use grafeo_common::types::Value;
4251
4252    // -----------------------------------------------------------------------
4253    // parse_default_literal
4254    // -----------------------------------------------------------------------
4255
4256    #[test]
4257    fn parse_default_literal_null() {
4258        assert_eq!(parse_default_literal("null"), Value::Null);
4259        assert_eq!(parse_default_literal("NULL"), Value::Null);
4260        assert_eq!(parse_default_literal("Null"), Value::Null);
4261    }
4262
4263    #[test]
4264    fn parse_default_literal_bool() {
4265        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4266        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4267        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4268        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4269    }
4270
4271    #[test]
4272    fn parse_default_literal_string_single_quoted() {
4273        assert_eq!(
4274            parse_default_literal("'hello'"),
4275            Value::String("hello".into())
4276        );
4277    }
4278
4279    #[test]
4280    fn parse_default_literal_string_double_quoted() {
4281        assert_eq!(
4282            parse_default_literal("\"world\""),
4283            Value::String("world".into())
4284        );
4285    }
4286
4287    #[test]
4288    fn parse_default_literal_integer() {
4289        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4290        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4291        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4292    }
4293
4294    #[test]
4295    fn parse_default_literal_float() {
4296        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4297        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4298    }
4299
4300    #[test]
4301    fn parse_default_literal_fallback_string() {
4302        // Not a recognized literal, not quoted, not a number
4303        assert_eq!(
4304            parse_default_literal("some_identifier"),
4305            Value::String("some_identifier".into())
4306        );
4307    }
4308
4309    #[test]
4310    fn test_session_create_node() {
4311        let db = GrafeoDB::new_in_memory();
4312        let session = db.session();
4313
4314        let id = session.create_node(&["Person"]);
4315        assert!(id.is_valid());
4316        assert_eq!(db.node_count(), 1);
4317    }
4318
4319    #[test]
4320    fn test_session_transaction() {
4321        let db = GrafeoDB::new_in_memory();
4322        let mut session = db.session();
4323
4324        assert!(!session.in_transaction());
4325
4326        session.begin_transaction().unwrap();
4327        assert!(session.in_transaction());
4328
4329        session.commit().unwrap();
4330        assert!(!session.in_transaction());
4331    }
4332
4333    #[test]
4334    fn test_session_transaction_context() {
4335        let db = GrafeoDB::new_in_memory();
4336        let mut session = db.session();
4337
4338        // Without transaction - context should have current epoch and no transaction_id
4339        let (_epoch1, transaction_id1) = session.get_transaction_context();
4340        assert!(transaction_id1.is_none());
4341
4342        // Start a transaction
4343        session.begin_transaction().unwrap();
4344        let (epoch2, transaction_id2) = session.get_transaction_context();
4345        assert!(transaction_id2.is_some());
4346        // Transaction should have a valid epoch
4347        let _ = epoch2; // Use the variable
4348
4349        // Commit and verify
4350        session.commit().unwrap();
4351        let (epoch3, tx_id3) = session.get_transaction_context();
4352        assert!(tx_id3.is_none());
4353        // Epoch should have advanced after commit
4354        assert!(epoch3.as_u64() >= epoch2.as_u64());
4355    }
4356
4357    #[test]
4358    fn test_session_rollback() {
4359        let db = GrafeoDB::new_in_memory();
4360        let mut session = db.session();
4361
4362        session.begin_transaction().unwrap();
4363        session.rollback().unwrap();
4364        assert!(!session.in_transaction());
4365    }
4366
4367    #[test]
4368    fn test_session_rollback_discards_versions() {
4369        use grafeo_common::types::TransactionId;
4370
4371        let db = GrafeoDB::new_in_memory();
4372
4373        // Create a node outside of any transaction (at system level)
4374        let node_before = db.store().create_node(&["Person"]);
4375        assert!(node_before.is_valid());
4376        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4377
4378        // Start a transaction
4379        let mut session = db.session();
4380        session.begin_transaction().unwrap();
4381        let transaction_id = session.current_transaction.lock().unwrap();
4382
4383        // Create a node versioned with the transaction's ID
4384        let epoch = db.store().current_epoch();
4385        let node_in_tx = db
4386            .store()
4387            .create_node_versioned(&["Person"], epoch, transaction_id);
4388        assert!(node_in_tx.is_valid());
4389
4390        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4391        // non-versioned lookups like node_count(). Verify the node is visible
4392        // only through the owning transaction.
4393        assert_eq!(
4394            db.node_count(),
4395            1,
4396            "PENDING nodes should be invisible to non-versioned node_count()"
4397        );
4398        assert!(
4399            db.store()
4400                .get_node_versioned(node_in_tx, epoch, transaction_id)
4401                .is_some(),
4402            "Transaction node should be visible to its own transaction"
4403        );
4404
4405        // Rollback the transaction
4406        session.rollback().unwrap();
4407        assert!(!session.in_transaction());
4408
4409        // The node created in the transaction should be discarded
4410        // Only the first node should remain visible
4411        let count_after = db.node_count();
4412        assert_eq!(
4413            count_after, 1,
4414            "Rollback should discard uncommitted node, but got {count_after}"
4415        );
4416
4417        // The original node should still be accessible
4418        let current_epoch = db.store().current_epoch();
4419        assert!(
4420            db.store()
4421                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4422                .is_some(),
4423            "Original node should still exist"
4424        );
4425
4426        // The node created in the transaction should not be accessible
4427        assert!(
4428            db.store()
4429                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4430                .is_none(),
4431            "Transaction node should be gone"
4432        );
4433    }
4434
4435    #[test]
4436    fn test_session_create_node_in_transaction() {
4437        // Test that session.create_node() is transaction-aware
4438        let db = GrafeoDB::new_in_memory();
4439
4440        // Create a node outside of any transaction
4441        let node_before = db.create_node(&["Person"]);
4442        assert!(node_before.is_valid());
4443        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4444
4445        // Start a transaction and create a node through the session
4446        let mut session = db.session();
4447        session.begin_transaction().unwrap();
4448        let transaction_id = session.current_transaction.lock().unwrap();
4449
4450        // Create a node through session.create_node() - should be versioned with tx
4451        let node_in_tx = session.create_node(&["Person"]);
4452        assert!(node_in_tx.is_valid());
4453
4454        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4455        // non-versioned lookups. Verify the node is visible only to its own tx.
4456        assert_eq!(
4457            db.node_count(),
4458            1,
4459            "PENDING nodes should be invisible to non-versioned node_count()"
4460        );
4461        let epoch = db.store().current_epoch();
4462        assert!(
4463            db.store()
4464                .get_node_versioned(node_in_tx, epoch, transaction_id)
4465                .is_some(),
4466            "Transaction node should be visible to its own transaction"
4467        );
4468
4469        // Rollback the transaction
4470        session.rollback().unwrap();
4471
4472        // The node created via session.create_node() should be discarded
4473        let count_after = db.node_count();
4474        assert_eq!(
4475            count_after, 1,
4476            "Rollback should discard node created via session.create_node(), but got {count_after}"
4477        );
4478    }
4479
4480    #[test]
4481    fn test_session_create_node_with_props_in_transaction() {
4482        use grafeo_common::types::Value;
4483
4484        // Test that session.create_node_with_props() is transaction-aware
4485        let db = GrafeoDB::new_in_memory();
4486
4487        // Create a node outside of any transaction
4488        db.create_node(&["Person"]);
4489        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4490
4491        // Start a transaction and create a node with properties
4492        let mut session = db.session();
4493        session.begin_transaction().unwrap();
4494        let transaction_id = session.current_transaction.lock().unwrap();
4495
4496        let node_in_tx =
4497            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4498        assert!(node_in_tx.is_valid());
4499
4500        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4501        // non-versioned lookups. Verify the node is visible only to its own tx.
4502        assert_eq!(
4503            db.node_count(),
4504            1,
4505            "PENDING nodes should be invisible to non-versioned node_count()"
4506        );
4507        let epoch = db.store().current_epoch();
4508        assert!(
4509            db.store()
4510                .get_node_versioned(node_in_tx, epoch, transaction_id)
4511                .is_some(),
4512            "Transaction node should be visible to its own transaction"
4513        );
4514
4515        // Rollback the transaction
4516        session.rollback().unwrap();
4517
4518        // The node should be discarded
4519        let count_after = db.node_count();
4520        assert_eq!(
4521            count_after, 1,
4522            "Rollback should discard node created via session.create_node_with_props()"
4523        );
4524    }
4525
4526    #[cfg(feature = "gql")]
4527    mod gql_tests {
4528        use super::*;
4529
4530        #[test]
4531        fn test_gql_query_execution() {
4532            let db = GrafeoDB::new_in_memory();
4533            let session = db.session();
4534
4535            // Create some test data
4536            session.create_node(&["Person"]);
4537            session.create_node(&["Person"]);
4538            session.create_node(&["Animal"]);
4539
4540            // Execute a GQL query
4541            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4542
4543            // Should return 2 Person nodes
4544            assert_eq!(result.row_count(), 2);
4545            assert_eq!(result.column_count(), 1);
4546            assert_eq!(result.columns[0], "n");
4547        }
4548
4549        #[test]
4550        fn test_gql_empty_result() {
4551            let db = GrafeoDB::new_in_memory();
4552            let session = db.session();
4553
4554            // No data in database
4555            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4556
4557            assert_eq!(result.row_count(), 0);
4558        }
4559
4560        #[test]
4561        fn test_gql_parse_error() {
4562            let db = GrafeoDB::new_in_memory();
4563            let session = db.session();
4564
4565            // Invalid GQL syntax
4566            let result = session.execute("MATCH (n RETURN n");
4567
4568            assert!(result.is_err());
4569        }
4570
4571        #[test]
4572        fn test_gql_relationship_traversal() {
4573            let db = GrafeoDB::new_in_memory();
4574            let session = db.session();
4575
4576            // Create a graph: Alix -> Gus, Alix -> Vincent
4577            let alix = session.create_node(&["Person"]);
4578            let gus = session.create_node(&["Person"]);
4579            let vincent = session.create_node(&["Person"]);
4580
4581            session.create_edge(alix, gus, "KNOWS");
4582            session.create_edge(alix, vincent, "KNOWS");
4583
4584            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4585            let result = session
4586                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4587                .unwrap();
4588
4589            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4590            assert_eq!(result.row_count(), 2);
4591            assert_eq!(result.column_count(), 2);
4592            assert_eq!(result.columns[0], "a");
4593            assert_eq!(result.columns[1], "b");
4594        }
4595
4596        #[test]
4597        fn test_gql_relationship_with_type_filter() {
4598            let db = GrafeoDB::new_in_memory();
4599            let session = db.session();
4600
4601            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4602            let alix = session.create_node(&["Person"]);
4603            let gus = session.create_node(&["Person"]);
4604            let vincent = session.create_node(&["Person"]);
4605
4606            session.create_edge(alix, gus, "KNOWS");
4607            session.create_edge(alix, vincent, "WORKS_WITH");
4608
4609            // Query only KNOWS relationships
4610            let result = session
4611                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4612                .unwrap();
4613
4614            // Should return only 1 row (Alix->Gus)
4615            assert_eq!(result.row_count(), 1);
4616        }
4617
4618        #[test]
4619        fn test_gql_semantic_error_undefined_variable() {
4620            let db = GrafeoDB::new_in_memory();
4621            let session = db.session();
4622
4623            // Reference undefined variable 'x' in RETURN
4624            let result = session.execute("MATCH (n:Person) RETURN x");
4625
4626            // Should fail with semantic error
4627            assert!(result.is_err());
4628            let Err(err) = result else {
4629                panic!("Expected error")
4630            };
4631            assert!(
4632                err.to_string().contains("Undefined variable"),
4633                "Expected undefined variable error, got: {}",
4634                err
4635            );
4636        }
4637
4638        #[test]
4639        fn test_gql_where_clause_property_filter() {
4640            use grafeo_common::types::Value;
4641
4642            let db = GrafeoDB::new_in_memory();
4643            let session = db.session();
4644
4645            // Create people with ages
4646            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4647            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4648            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4649
4650            // Query with WHERE clause: age > 30
4651            let result = session
4652                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4653                .unwrap();
4654
4655            // Should return 2 people (ages 35 and 45)
4656            assert_eq!(result.row_count(), 2);
4657        }
4658
4659        #[test]
4660        fn test_gql_where_clause_equality() {
4661            use grafeo_common::types::Value;
4662
4663            let db = GrafeoDB::new_in_memory();
4664            let session = db.session();
4665
4666            // Create people with names
4667            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4668            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4669            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4670
4671            // Query with WHERE clause: name = "Alix"
4672            let result = session
4673                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4674                .unwrap();
4675
4676            // Should return 2 people named Alix
4677            assert_eq!(result.row_count(), 2);
4678        }
4679
4680        #[test]
4681        fn test_gql_return_property_access() {
4682            use grafeo_common::types::Value;
4683
4684            let db = GrafeoDB::new_in_memory();
4685            let session = db.session();
4686
4687            // Create people with names and ages
4688            session.create_node_with_props(
4689                &["Person"],
4690                [
4691                    ("name", Value::String("Alix".into())),
4692                    ("age", Value::Int64(30)),
4693                ],
4694            );
4695            session.create_node_with_props(
4696                &["Person"],
4697                [
4698                    ("name", Value::String("Gus".into())),
4699                    ("age", Value::Int64(25)),
4700                ],
4701            );
4702
4703            // Query returning properties
4704            let result = session
4705                .execute("MATCH (n:Person) RETURN n.name, n.age")
4706                .unwrap();
4707
4708            // Should return 2 rows with name and age columns
4709            assert_eq!(result.row_count(), 2);
4710            assert_eq!(result.column_count(), 2);
4711            assert_eq!(result.columns[0], "n.name");
4712            assert_eq!(result.columns[1], "n.age");
4713
4714            // Check that we get actual values
4715            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4716            assert!(names.contains(&&Value::String("Alix".into())));
4717            assert!(names.contains(&&Value::String("Gus".into())));
4718        }
4719
4720        #[test]
4721        fn test_gql_return_mixed_expressions() {
4722            use grafeo_common::types::Value;
4723
4724            let db = GrafeoDB::new_in_memory();
4725            let session = db.session();
4726
4727            // Create a person
4728            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4729
4730            // Query returning both node and property
4731            let result = session
4732                .execute("MATCH (n:Person) RETURN n, n.name")
4733                .unwrap();
4734
4735            assert_eq!(result.row_count(), 1);
4736            assert_eq!(result.column_count(), 2);
4737            assert_eq!(result.columns[0], "n");
4738            assert_eq!(result.columns[1], "n.name");
4739
4740            // Second column should be the name
4741            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4742        }
4743    }
4744
4745    #[cfg(feature = "cypher")]
4746    mod cypher_tests {
4747        use super::*;
4748
4749        #[test]
4750        fn test_cypher_query_execution() {
4751            let db = GrafeoDB::new_in_memory();
4752            let session = db.session();
4753
4754            // Create some test data
4755            session.create_node(&["Person"]);
4756            session.create_node(&["Person"]);
4757            session.create_node(&["Animal"]);
4758
4759            // Execute a Cypher query
4760            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4761
4762            // Should return 2 Person nodes
4763            assert_eq!(result.row_count(), 2);
4764            assert_eq!(result.column_count(), 1);
4765            assert_eq!(result.columns[0], "n");
4766        }
4767
4768        #[test]
4769        fn test_cypher_empty_result() {
4770            let db = GrafeoDB::new_in_memory();
4771            let session = db.session();
4772
4773            // No data in database
4774            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4775
4776            assert_eq!(result.row_count(), 0);
4777        }
4778
4779        #[test]
4780        fn test_cypher_parse_error() {
4781            let db = GrafeoDB::new_in_memory();
4782            let session = db.session();
4783
4784            // Invalid Cypher syntax
4785            let result = session.execute_cypher("MATCH (n RETURN n");
4786
4787            assert!(result.is_err());
4788        }
4789    }
4790
4791    // ==================== Direct Lookup API Tests ====================
4792
4793    mod direct_lookup_tests {
4794        use super::*;
4795        use grafeo_common::types::Value;
4796
4797        #[test]
4798        fn test_get_node() {
4799            let db = GrafeoDB::new_in_memory();
4800            let session = db.session();
4801
4802            let id = session.create_node(&["Person"]);
4803            let node = session.get_node(id);
4804
4805            assert!(node.is_some());
4806            let node = node.unwrap();
4807            assert_eq!(node.id, id);
4808        }
4809
4810        #[test]
4811        fn test_get_node_not_found() {
4812            use grafeo_common::types::NodeId;
4813
4814            let db = GrafeoDB::new_in_memory();
4815            let session = db.session();
4816
4817            // Try to get a non-existent node
4818            let node = session.get_node(NodeId::new(9999));
4819            assert!(node.is_none());
4820        }
4821
4822        #[test]
4823        fn test_get_node_property() {
4824            let db = GrafeoDB::new_in_memory();
4825            let session = db.session();
4826
4827            let id = session
4828                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4829
4830            let name = session.get_node_property(id, "name");
4831            assert_eq!(name, Some(Value::String("Alix".into())));
4832
4833            // Non-existent property
4834            let missing = session.get_node_property(id, "missing");
4835            assert!(missing.is_none());
4836        }
4837
4838        #[test]
4839        fn test_get_edge() {
4840            let db = GrafeoDB::new_in_memory();
4841            let session = db.session();
4842
4843            let alix = session.create_node(&["Person"]);
4844            let gus = session.create_node(&["Person"]);
4845            let edge_id = session.create_edge(alix, gus, "KNOWS");
4846
4847            let edge = session.get_edge(edge_id);
4848            assert!(edge.is_some());
4849            let edge = edge.unwrap();
4850            assert_eq!(edge.id, edge_id);
4851            assert_eq!(edge.src, alix);
4852            assert_eq!(edge.dst, gus);
4853        }
4854
4855        #[test]
4856        fn test_get_edge_not_found() {
4857            use grafeo_common::types::EdgeId;
4858
4859            let db = GrafeoDB::new_in_memory();
4860            let session = db.session();
4861
4862            let edge = session.get_edge(EdgeId::new(9999));
4863            assert!(edge.is_none());
4864        }
4865
4866        #[test]
4867        fn test_get_neighbors_outgoing() {
4868            let db = GrafeoDB::new_in_memory();
4869            let session = db.session();
4870
4871            let alix = session.create_node(&["Person"]);
4872            let gus = session.create_node(&["Person"]);
4873            let harm = session.create_node(&["Person"]);
4874
4875            session.create_edge(alix, gus, "KNOWS");
4876            session.create_edge(alix, harm, "KNOWS");
4877
4878            let neighbors = session.get_neighbors_outgoing(alix);
4879            assert_eq!(neighbors.len(), 2);
4880
4881            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4882            assert!(neighbor_ids.contains(&gus));
4883            assert!(neighbor_ids.contains(&harm));
4884        }
4885
4886        #[test]
4887        fn test_get_neighbors_incoming() {
4888            let db = GrafeoDB::new_in_memory();
4889            let session = db.session();
4890
4891            let alix = session.create_node(&["Person"]);
4892            let gus = session.create_node(&["Person"]);
4893            let harm = session.create_node(&["Person"]);
4894
4895            session.create_edge(gus, alix, "KNOWS");
4896            session.create_edge(harm, alix, "KNOWS");
4897
4898            let neighbors = session.get_neighbors_incoming(alix);
4899            assert_eq!(neighbors.len(), 2);
4900
4901            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4902            assert!(neighbor_ids.contains(&gus));
4903            assert!(neighbor_ids.contains(&harm));
4904        }
4905
4906        #[test]
4907        fn test_get_neighbors_outgoing_by_type() {
4908            let db = GrafeoDB::new_in_memory();
4909            let session = db.session();
4910
4911            let alix = session.create_node(&["Person"]);
4912            let gus = session.create_node(&["Person"]);
4913            let company = session.create_node(&["Company"]);
4914
4915            session.create_edge(alix, gus, "KNOWS");
4916            session.create_edge(alix, company, "WORKS_AT");
4917
4918            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4919            assert_eq!(knows_neighbors.len(), 1);
4920            assert_eq!(knows_neighbors[0].0, gus);
4921
4922            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4923            assert_eq!(works_neighbors.len(), 1);
4924            assert_eq!(works_neighbors[0].0, company);
4925
4926            // No edges of this type
4927            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4928            assert!(no_neighbors.is_empty());
4929        }
4930
4931        #[test]
4932        fn test_node_exists() {
4933            use grafeo_common::types::NodeId;
4934
4935            let db = GrafeoDB::new_in_memory();
4936            let session = db.session();
4937
4938            let id = session.create_node(&["Person"]);
4939
4940            assert!(session.node_exists(id));
4941            assert!(!session.node_exists(NodeId::new(9999)));
4942        }
4943
4944        #[test]
4945        fn test_edge_exists() {
4946            use grafeo_common::types::EdgeId;
4947
4948            let db = GrafeoDB::new_in_memory();
4949            let session = db.session();
4950
4951            let alix = session.create_node(&["Person"]);
4952            let gus = session.create_node(&["Person"]);
4953            let edge_id = session.create_edge(alix, gus, "KNOWS");
4954
4955            assert!(session.edge_exists(edge_id));
4956            assert!(!session.edge_exists(EdgeId::new(9999)));
4957        }
4958
4959        #[test]
4960        fn test_get_degree() {
4961            let db = GrafeoDB::new_in_memory();
4962            let session = db.session();
4963
4964            let alix = session.create_node(&["Person"]);
4965            let gus = session.create_node(&["Person"]);
4966            let harm = session.create_node(&["Person"]);
4967
4968            // Alix knows Gus and Harm (2 outgoing)
4969            session.create_edge(alix, gus, "KNOWS");
4970            session.create_edge(alix, harm, "KNOWS");
4971            // Gus knows Alix (1 incoming for Alix)
4972            session.create_edge(gus, alix, "KNOWS");
4973
4974            let (out_degree, in_degree) = session.get_degree(alix);
4975            assert_eq!(out_degree, 2);
4976            assert_eq!(in_degree, 1);
4977
4978            // Node with no edges
4979            let lonely = session.create_node(&["Person"]);
4980            let (out, in_deg) = session.get_degree(lonely);
4981            assert_eq!(out, 0);
4982            assert_eq!(in_deg, 0);
4983        }
4984
4985        #[test]
4986        fn test_get_nodes_batch() {
4987            let db = GrafeoDB::new_in_memory();
4988            let session = db.session();
4989
4990            let alix = session.create_node(&["Person"]);
4991            let gus = session.create_node(&["Person"]);
4992            let harm = session.create_node(&["Person"]);
4993
4994            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4995            assert_eq!(nodes.len(), 3);
4996            assert!(nodes[0].is_some());
4997            assert!(nodes[1].is_some());
4998            assert!(nodes[2].is_some());
4999
5000            // With non-existent node
5001            use grafeo_common::types::NodeId;
5002            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
5003            assert_eq!(nodes_with_missing.len(), 3);
5004            assert!(nodes_with_missing[0].is_some());
5005            assert!(nodes_with_missing[1].is_none()); // Missing node
5006            assert!(nodes_with_missing[2].is_some());
5007        }
5008
5009        #[test]
5010        fn test_auto_commit_setting() {
5011            let db = GrafeoDB::new_in_memory();
5012            let mut session = db.session();
5013
5014            // Default is auto-commit enabled
5015            assert!(session.auto_commit());
5016
5017            session.set_auto_commit(false);
5018            assert!(!session.auto_commit());
5019
5020            session.set_auto_commit(true);
5021            assert!(session.auto_commit());
5022        }
5023
5024        #[test]
5025        fn test_transaction_double_begin_nests() {
5026            let db = GrafeoDB::new_in_memory();
5027            let mut session = db.session();
5028
5029            session.begin_transaction().unwrap();
5030            // Second begin_transaction creates a nested transaction (auto-savepoint)
5031            let result = session.begin_transaction();
5032            assert!(result.is_ok());
5033            // Commit the inner (releases savepoint)
5034            session.commit().unwrap();
5035            // Commit the outer
5036            session.commit().unwrap();
5037        }
5038
5039        #[test]
5040        fn test_commit_without_transaction_error() {
5041            let db = GrafeoDB::new_in_memory();
5042            let mut session = db.session();
5043
5044            let result = session.commit();
5045            assert!(result.is_err());
5046        }
5047
5048        #[test]
5049        fn test_rollback_without_transaction_error() {
5050            let db = GrafeoDB::new_in_memory();
5051            let mut session = db.session();
5052
5053            let result = session.rollback();
5054            assert!(result.is_err());
5055        }
5056
5057        #[test]
5058        fn test_create_edge_in_transaction() {
5059            let db = GrafeoDB::new_in_memory();
5060            let mut session = db.session();
5061
5062            // Create nodes outside transaction
5063            let alix = session.create_node(&["Person"]);
5064            let gus = session.create_node(&["Person"]);
5065
5066            // Create edge in transaction
5067            session.begin_transaction().unwrap();
5068            let edge_id = session.create_edge(alix, gus, "KNOWS");
5069
5070            // Edge should be visible in the transaction
5071            assert!(session.edge_exists(edge_id));
5072
5073            // Commit
5074            session.commit().unwrap();
5075
5076            // Edge should still be visible
5077            assert!(session.edge_exists(edge_id));
5078        }
5079
5080        #[test]
5081        fn test_neighbors_empty_node() {
5082            let db = GrafeoDB::new_in_memory();
5083            let session = db.session();
5084
5085            let lonely = session.create_node(&["Person"]);
5086
5087            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5088            assert!(session.get_neighbors_incoming(lonely).is_empty());
5089            assert!(
5090                session
5091                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5092                    .is_empty()
5093            );
5094        }
5095    }
5096
5097    #[test]
5098    fn test_auto_gc_triggers_on_commit_interval() {
5099        use crate::config::Config;
5100
5101        let config = Config::in_memory().with_gc_interval(2);
5102        let db = GrafeoDB::with_config(config).unwrap();
5103        let mut session = db.session();
5104
5105        // First commit: counter = 1, no GC (not a multiple of 2)
5106        session.begin_transaction().unwrap();
5107        session.create_node(&["A"]);
5108        session.commit().unwrap();
5109
5110        // Second commit: counter = 2, GC should trigger (multiple of 2)
5111        session.begin_transaction().unwrap();
5112        session.create_node(&["B"]);
5113        session.commit().unwrap();
5114
5115        // Verify the database is still functional after GC
5116        assert_eq!(db.node_count(), 2);
5117    }
5118
5119    #[test]
5120    fn test_query_timeout_config_propagates_to_session() {
5121        use crate::config::Config;
5122        use std::time::Duration;
5123
5124        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5125        let db = GrafeoDB::with_config(config).unwrap();
5126        let session = db.session();
5127
5128        // Verify the session has a query deadline (timeout was set)
5129        assert!(session.query_deadline().is_some());
5130    }
5131
5132    #[test]
5133    fn test_no_query_timeout_returns_no_deadline() {
5134        let db = GrafeoDB::new_in_memory();
5135        let session = db.session();
5136
5137        // Default config has no timeout
5138        assert!(session.query_deadline().is_none());
5139    }
5140
5141    #[test]
5142    fn test_graph_model_accessor() {
5143        use crate::config::GraphModel;
5144
5145        let db = GrafeoDB::new_in_memory();
5146        let session = db.session();
5147
5148        assert_eq!(session.graph_model(), GraphModel::Lpg);
5149    }
5150
5151    #[cfg(feature = "gql")]
5152    #[test]
5153    fn test_external_store_session() {
5154        use grafeo_core::graph::GraphStoreMut;
5155        use std::sync::Arc;
5156
5157        let config = crate::config::Config::in_memory();
5158        let store =
5159            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5160        let db = GrafeoDB::with_store(store, config).unwrap();
5161
5162        let mut session = db.session();
5163
5164        // Use an explicit transaction so that INSERT and MATCH share the same
5165        // transaction context. With PENDING epochs, uncommitted versions are
5166        // only visible to the owning transaction.
5167        session.begin_transaction().unwrap();
5168        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5169
5170        // Verify we can query through it within the same transaction
5171        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5172        assert_eq!(result.row_count(), 1);
5173
5174        session.commit().unwrap();
5175    }
5176
5177    // ==================== Session Command Tests ====================
5178
5179    #[cfg(feature = "gql")]
5180    mod session_command_tests {
5181        use super::*;
5182        use grafeo_common::types::Value;
5183
5184        #[test]
5185        fn test_use_graph_sets_current_graph() {
5186            let db = GrafeoDB::new_in_memory();
5187            let session = db.session();
5188
5189            // Create the graph first, then USE it
5190            session.execute("CREATE GRAPH mydb").unwrap();
5191            session.execute("USE GRAPH mydb").unwrap();
5192
5193            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5194        }
5195
5196        #[test]
5197        fn test_use_graph_nonexistent_errors() {
5198            let db = GrafeoDB::new_in_memory();
5199            let session = db.session();
5200
5201            let result = session.execute("USE GRAPH doesnotexist");
5202            assert!(result.is_err());
5203            let err = result.unwrap_err().to_string();
5204            assert!(
5205                err.contains("does not exist"),
5206                "Expected 'does not exist' error, got: {err}"
5207            );
5208        }
5209
5210        #[test]
5211        fn test_use_graph_default_always_valid() {
5212            let db = GrafeoDB::new_in_memory();
5213            let session = db.session();
5214
5215            // "default" is always valid, even without CREATE GRAPH
5216            session.execute("USE GRAPH default").unwrap();
5217            assert_eq!(session.current_graph(), Some("default".to_string()));
5218        }
5219
5220        #[test]
5221        fn test_session_set_graph() {
5222            let db = GrafeoDB::new_in_memory();
5223            let session = db.session();
5224
5225            session.execute("CREATE GRAPH analytics").unwrap();
5226            session.execute("SESSION SET GRAPH analytics").unwrap();
5227            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5228        }
5229
5230        #[test]
5231        fn test_session_set_graph_nonexistent_errors() {
5232            let db = GrafeoDB::new_in_memory();
5233            let session = db.session();
5234
5235            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5236            assert!(result.is_err());
5237        }
5238
5239        #[test]
5240        fn test_session_set_time_zone() {
5241            let db = GrafeoDB::new_in_memory();
5242            let session = db.session();
5243
5244            assert_eq!(session.time_zone(), None);
5245
5246            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5247            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5248
5249            session
5250                .execute("SESSION SET TIME ZONE 'America/New_York'")
5251                .unwrap();
5252            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5253        }
5254
5255        #[test]
5256        fn test_session_set_parameter() {
5257            let db = GrafeoDB::new_in_memory();
5258            let session = db.session();
5259
5260            session
5261                .execute("SESSION SET PARAMETER $timeout = 30")
5262                .unwrap();
5263
5264            // Parameter is stored (value is Null for now, since expression
5265            // evaluation is not yet wired up)
5266            assert!(session.get_parameter("timeout").is_some());
5267        }
5268
5269        #[test]
5270        fn test_session_reset_clears_all_state() {
5271            let db = GrafeoDB::new_in_memory();
5272            let session = db.session();
5273
5274            // Set various session state
5275            session.execute("CREATE GRAPH analytics").unwrap();
5276            session.execute("SESSION SET GRAPH analytics").unwrap();
5277            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5278            session
5279                .execute("SESSION SET PARAMETER $limit = 100")
5280                .unwrap();
5281
5282            // Verify state was set
5283            assert!(session.current_graph().is_some());
5284            assert!(session.time_zone().is_some());
5285            assert!(session.get_parameter("limit").is_some());
5286
5287            // Reset everything
5288            session.execute("SESSION RESET").unwrap();
5289
5290            assert_eq!(session.current_graph(), None);
5291            assert_eq!(session.time_zone(), None);
5292            assert!(session.get_parameter("limit").is_none());
5293        }
5294
5295        #[test]
5296        fn test_session_close_clears_state() {
5297            let db = GrafeoDB::new_in_memory();
5298            let session = db.session();
5299
5300            session.execute("CREATE GRAPH analytics").unwrap();
5301            session.execute("SESSION SET GRAPH analytics").unwrap();
5302            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5303
5304            session.execute("SESSION CLOSE").unwrap();
5305
5306            assert_eq!(session.current_graph(), None);
5307            assert_eq!(session.time_zone(), None);
5308        }
5309
5310        #[test]
5311        fn test_create_graph() {
5312            let db = GrafeoDB::new_in_memory();
5313            let session = db.session();
5314
5315            session.execute("CREATE GRAPH mydb").unwrap();
5316
5317            // Should be able to USE it now
5318            session.execute("USE GRAPH mydb").unwrap();
5319            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5320        }
5321
5322        #[test]
5323        fn test_create_graph_duplicate_errors() {
5324            let db = GrafeoDB::new_in_memory();
5325            let session = db.session();
5326
5327            session.execute("CREATE GRAPH mydb").unwrap();
5328            let result = session.execute("CREATE GRAPH mydb");
5329
5330            assert!(result.is_err());
5331            let err = result.unwrap_err().to_string();
5332            assert!(
5333                err.contains("already exists"),
5334                "Expected 'already exists' error, got: {err}"
5335            );
5336        }
5337
5338        #[test]
5339        fn test_create_graph_if_not_exists() {
5340            let db = GrafeoDB::new_in_memory();
5341            let session = db.session();
5342
5343            session.execute("CREATE GRAPH mydb").unwrap();
5344            // Should succeed silently with IF NOT EXISTS
5345            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5346        }
5347
5348        #[test]
5349        fn test_drop_graph() {
5350            let db = GrafeoDB::new_in_memory();
5351            let session = db.session();
5352
5353            session.execute("CREATE GRAPH mydb").unwrap();
5354            session.execute("DROP GRAPH mydb").unwrap();
5355
5356            // Should no longer be usable
5357            let result = session.execute("USE GRAPH mydb");
5358            assert!(result.is_err());
5359        }
5360
5361        #[test]
5362        fn test_drop_graph_nonexistent_errors() {
5363            let db = GrafeoDB::new_in_memory();
5364            let session = db.session();
5365
5366            let result = session.execute("DROP GRAPH nosuchgraph");
5367            assert!(result.is_err());
5368            let err = result.unwrap_err().to_string();
5369            assert!(
5370                err.contains("does not exist"),
5371                "Expected 'does not exist' error, got: {err}"
5372            );
5373        }
5374
5375        #[test]
5376        fn test_drop_graph_if_exists() {
5377            let db = GrafeoDB::new_in_memory();
5378            let session = db.session();
5379
5380            // Should succeed silently with IF EXISTS
5381            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5382        }
5383
5384        #[test]
5385        fn test_start_transaction_via_gql() {
5386            let db = GrafeoDB::new_in_memory();
5387            let session = db.session();
5388
5389            session.execute("START TRANSACTION").unwrap();
5390            assert!(session.in_transaction());
5391            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5392            session.execute("COMMIT").unwrap();
5393            assert!(!session.in_transaction());
5394
5395            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5396            assert_eq!(result.rows.len(), 1);
5397        }
5398
5399        #[test]
5400        fn test_start_transaction_read_only_blocks_insert() {
5401            let db = GrafeoDB::new_in_memory();
5402            let session = db.session();
5403
5404            session.execute("START TRANSACTION READ ONLY").unwrap();
5405            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5406            assert!(result.is_err());
5407            let err = result.unwrap_err().to_string();
5408            assert!(
5409                err.contains("read-only"),
5410                "Expected read-only error, got: {err}"
5411            );
5412            session.execute("ROLLBACK").unwrap();
5413        }
5414
5415        #[test]
5416        fn test_start_transaction_read_only_allows_reads() {
5417            let db = GrafeoDB::new_in_memory();
5418            let mut session = db.session();
5419            session.begin_transaction().unwrap();
5420            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5421            session.commit().unwrap();
5422
5423            session.execute("START TRANSACTION READ ONLY").unwrap();
5424            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5425            assert_eq!(result.rows.len(), 1);
5426            session.execute("COMMIT").unwrap();
5427        }
5428
5429        #[test]
5430        fn test_rollback_via_gql() {
5431            let db = GrafeoDB::new_in_memory();
5432            let session = db.session();
5433
5434            session.execute("START TRANSACTION").unwrap();
5435            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5436            session.execute("ROLLBACK").unwrap();
5437
5438            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5439            assert!(result.rows.is_empty());
5440        }
5441
5442        #[test]
5443        fn test_start_transaction_with_isolation_level() {
5444            let db = GrafeoDB::new_in_memory();
5445            let session = db.session();
5446
5447            session
5448                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5449                .unwrap();
5450            assert!(session.in_transaction());
5451            session.execute("ROLLBACK").unwrap();
5452        }
5453
5454        #[test]
5455        fn test_session_commands_return_empty_result() {
5456            let db = GrafeoDB::new_in_memory();
5457            let session = db.session();
5458
5459            session.execute("CREATE GRAPH test").unwrap();
5460            let result = session.execute("SESSION SET GRAPH test").unwrap();
5461            assert_eq!(result.row_count(), 0);
5462            assert_eq!(result.column_count(), 0);
5463        }
5464
5465        #[test]
5466        fn test_current_graph_default_is_none() {
5467            let db = GrafeoDB::new_in_memory();
5468            let session = db.session();
5469
5470            assert_eq!(session.current_graph(), None);
5471        }
5472
5473        #[test]
5474        fn test_time_zone_default_is_none() {
5475            let db = GrafeoDB::new_in_memory();
5476            let session = db.session();
5477
5478            assert_eq!(session.time_zone(), None);
5479        }
5480
5481        #[test]
5482        fn test_session_state_independent_across_sessions() {
5483            let db = GrafeoDB::new_in_memory();
5484            let session1 = db.session();
5485            let session2 = db.session();
5486
5487            session1.execute("CREATE GRAPH first").unwrap();
5488            session1.execute("CREATE GRAPH second").unwrap();
5489            session1.execute("SESSION SET GRAPH first").unwrap();
5490            session2.execute("SESSION SET GRAPH second").unwrap();
5491
5492            assert_eq!(session1.current_graph(), Some("first".to_string()));
5493            assert_eq!(session2.current_graph(), Some("second".to_string()));
5494        }
5495
5496        #[test]
5497        fn test_show_node_types() {
5498            let db = GrafeoDB::new_in_memory();
5499            let session = db.session();
5500
5501            session
5502                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5503                .unwrap();
5504
5505            let result = session.execute("SHOW NODE TYPES").unwrap();
5506            assert_eq!(
5507                result.columns,
5508                vec!["name", "properties", "constraints", "parents"]
5509            );
5510            assert_eq!(result.rows.len(), 1);
5511            // First column is the type name
5512            assert_eq!(result.rows[0][0], Value::from("Person"));
5513        }
5514
5515        #[test]
5516        fn test_show_edge_types() {
5517            let db = GrafeoDB::new_in_memory();
5518            let session = db.session();
5519
5520            session
5521                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5522                .unwrap();
5523
5524            let result = session.execute("SHOW EDGE TYPES").unwrap();
5525            assert_eq!(
5526                result.columns,
5527                vec!["name", "properties", "source_types", "target_types"]
5528            );
5529            assert_eq!(result.rows.len(), 1);
5530            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5531        }
5532
5533        #[test]
5534        fn test_show_graph_types() {
5535            let db = GrafeoDB::new_in_memory();
5536            let session = db.session();
5537
5538            session
5539                .execute("CREATE NODE TYPE Person (name STRING)")
5540                .unwrap();
5541            session
5542                .execute(
5543                    "CREATE GRAPH TYPE social (\
5544                        NODE TYPE Person (name STRING)\
5545                    )",
5546                )
5547                .unwrap();
5548
5549            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5550            assert_eq!(
5551                result.columns,
5552                vec!["name", "open", "node_types", "edge_types"]
5553            );
5554            assert_eq!(result.rows.len(), 1);
5555            assert_eq!(result.rows[0][0], Value::from("social"));
5556        }
5557
5558        #[test]
5559        fn test_show_graph_type_named() {
5560            let db = GrafeoDB::new_in_memory();
5561            let session = db.session();
5562
5563            session
5564                .execute("CREATE NODE TYPE Person (name STRING)")
5565                .unwrap();
5566            session
5567                .execute(
5568                    "CREATE GRAPH TYPE social (\
5569                        NODE TYPE Person (name STRING)\
5570                    )",
5571                )
5572                .unwrap();
5573
5574            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5575            assert_eq!(result.rows.len(), 1);
5576            assert_eq!(result.rows[0][0], Value::from("social"));
5577        }
5578
5579        #[test]
5580        fn test_show_graph_type_not_found() {
5581            let db = GrafeoDB::new_in_memory();
5582            let session = db.session();
5583
5584            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5585            assert!(result.is_err());
5586        }
5587
5588        #[test]
5589        fn test_show_indexes_via_gql() {
5590            let db = GrafeoDB::new_in_memory();
5591            let session = db.session();
5592
5593            let result = session.execute("SHOW INDEXES").unwrap();
5594            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5595        }
5596
5597        #[test]
5598        fn test_show_constraints_via_gql() {
5599            let db = GrafeoDB::new_in_memory();
5600            let session = db.session();
5601
5602            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5603            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5604        }
5605
5606        #[test]
5607        fn test_pattern_form_graph_type_roundtrip() {
5608            let db = GrafeoDB::new_in_memory();
5609            let session = db.session();
5610
5611            // Register the types first
5612            session
5613                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5614                .unwrap();
5615            session
5616                .execute("CREATE NODE TYPE City (name STRING)")
5617                .unwrap();
5618            session
5619                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5620                .unwrap();
5621            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5622
5623            // Create graph type using pattern form
5624            session
5625                .execute(
5626                    "CREATE GRAPH TYPE social (\
5627                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5628                        (:Person)-[:LIVES_IN]->(:City)\
5629                    )",
5630                )
5631                .unwrap();
5632
5633            // Verify it was created
5634            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5635            assert_eq!(result.rows.len(), 1);
5636            assert_eq!(result.rows[0][0], Value::from("social"));
5637        }
5638    }
5639}