Skip to main content

grafeo_engine/session/
mod.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7#[cfg(feature = "rdf")]
8mod rdf;
9
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
15use grafeo_common::utils::error::Result;
16use grafeo_common::{grafeo_debug_span, grafeo_info_span, grafeo_warn};
17use grafeo_core::graph::Direction;
18use grafeo_core::graph::GraphStoreMut;
19use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
20#[cfg(feature = "rdf")]
21use grafeo_core::graph::rdf::RdfStore;
22
23use crate::catalog::{Catalog, CatalogConstraintValidator};
24use crate::config::{AdaptiveConfig, GraphModel};
25use crate::database::QueryResult;
26use crate::query::cache::QueryCache;
27use crate::transaction::TransactionManager;
28
29/// Parses a DDL default-value literal string into a [`Value`].
30///
31/// Handles string literals (single- or double-quoted), integers, floats,
32/// booleans (`true`/`false`), and `NULL`.
33fn parse_default_literal(text: &str) -> Value {
34    if text.eq_ignore_ascii_case("null") {
35        return Value::Null;
36    }
37    if text.eq_ignore_ascii_case("true") {
38        return Value::Bool(true);
39    }
40    if text.eq_ignore_ascii_case("false") {
41        return Value::Bool(false);
42    }
43    // String literal: strip surrounding quotes
44    if (text.starts_with('\'') && text.ends_with('\''))
45        || (text.starts_with('"') && text.ends_with('"'))
46    {
47        return Value::String(text[1..text.len() - 1].into());
48    }
49    // Try integer, then float
50    if let Ok(i) = text.parse::<i64>() {
51        return Value::Int64(i);
52    }
53    if let Ok(f) = text.parse::<f64>() {
54        return Value::Float64(f);
55    }
56    // Fallback: treat as string
57    Value::String(text.into())
58}
59
60/// Runtime configuration for creating a new session.
61///
62/// Groups the shared parameters passed to all session constructors, keeping
63/// call sites readable and avoiding long argument lists.
64pub(crate) struct SessionConfig {
65    pub transaction_manager: Arc<TransactionManager>,
66    pub query_cache: Arc<QueryCache>,
67    pub catalog: Arc<Catalog>,
68    pub adaptive_config: AdaptiveConfig,
69    pub factorized_execution: bool,
70    pub graph_model: GraphModel,
71    pub query_timeout: Option<Duration>,
72    pub commit_counter: Arc<AtomicUsize>,
73    pub gc_interval: usize,
74    /// When true, the session permanently blocks all mutations.
75    pub read_only: bool,
76}
77
78/// Your handle to the database - execute queries and manage transactions.
79///
80/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
81/// tracks its own transaction state, so you can have multiple concurrent
82/// sessions without them interfering.
83pub struct Session {
84    /// The underlying store.
85    store: Arc<LpgStore>,
86    /// Graph store trait object for pluggable storage backends.
87    graph_store: Arc<dyn GraphStoreMut>,
88    /// Schema and metadata catalog shared across sessions.
89    catalog: Arc<Catalog>,
90    /// RDF triple store (if RDF feature is enabled).
91    #[cfg(feature = "rdf")]
92    rdf_store: Arc<RdfStore>,
93    /// Transaction manager.
94    transaction_manager: Arc<TransactionManager>,
95    /// Query cache shared across sessions.
96    query_cache: Arc<QueryCache>,
97    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
98    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
99    /// from within `execute(&self)`.
100    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
101    /// Whether the current transaction is read-only (blocks mutations).
102    read_only_tx: parking_lot::Mutex<bool>,
103    /// Whether the database itself is read-only (set at open time, never changes).
104    /// When true, `read_only_tx` is always true regardless of transaction flags.
105    db_read_only: bool,
106    /// Whether the session is in auto-commit mode.
107    auto_commit: bool,
108    /// Adaptive execution configuration.
109    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
110    adaptive_config: AdaptiveConfig,
111    /// Whether to use factorized execution for multi-hop queries.
112    factorized_execution: bool,
113    /// The graph data model this session operates on.
114    graph_model: GraphModel,
115    /// Maximum time a query may run before being cancelled.
116    query_timeout: Option<Duration>,
117    /// Shared commit counter for triggering auto-GC.
118    commit_counter: Arc<AtomicUsize>,
119    /// GC every N commits (0 = disabled).
120    gc_interval: usize,
121    /// Node count at the start of the current transaction (for PreparedCommit stats).
122    transaction_start_node_count: AtomicUsize,
123    /// Edge count at the start of the current transaction (for PreparedCommit stats).
124    transaction_start_edge_count: AtomicUsize,
125    /// WAL for logging schema changes.
126    #[cfg(feature = "wal")]
127    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
128    /// Shared WAL graph context tracker for named graph awareness.
129    #[cfg(feature = "wal")]
130    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
131    /// CDC log for change tracking.
132    #[cfg(feature = "cdc")]
133    cdc_log: Arc<crate::cdc::CdcLog>,
134    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
135    current_graph: parking_lot::Mutex<Option<String>>,
136    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
137    /// None = "not set" (uses default schema).
138    current_schema: parking_lot::Mutex<Option<String>>,
139    /// Session time zone override.
140    time_zone: parking_lot::Mutex<Option<String>>,
141    /// Session-level parameters (SET PARAMETER).
142    session_params:
143        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
144    /// Override epoch for time-travel queries (None = use transaction/current epoch).
145    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
146    /// Savepoints within the current transaction.
147    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
148    /// Nesting depth for nested transactions (0 = outermost).
149    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
150    /// releases it, nested `ROLLBACK` rolls back to it.
151    transaction_nesting_depth: parking_lot::Mutex<u32>,
152    /// Named graphs touched during the current transaction (for cross-graph atomicity).
153    /// `None` represents the default graph. Populated at `BEGIN` time and on each
154    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
155    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
156    /// Shared metrics registry (populated when the `metrics` feature is enabled).
157    #[cfg(feature = "metrics")]
158    pub(crate) metrics: Option<Arc<crate::metrics::MetricsRegistry>>,
159    /// Transaction start time for duration tracking.
160    #[cfg(feature = "metrics")]
161    tx_start_time: parking_lot::Mutex<Option<Instant>>,
162}
163
164/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
165#[derive(Clone)]
166struct GraphSavepoint {
167    graph_name: Option<String>,
168    next_node_id: u64,
169    next_edge_id: u64,
170    undo_log_position: usize,
171}
172
173/// Savepoint state: name + per-graph snapshots + the graph that was active.
174#[derive(Clone)]
175struct SavepointState {
176    name: String,
177    graph_snapshots: Vec<GraphSavepoint>,
178    /// The graph that was active when the savepoint was created.
179    /// Reserved for future use (e.g., restoring graph context on rollback).
180    #[allow(dead_code)]
181    active_graph: Option<String>,
182}
183
184impl Session {
185    /// Creates a new session with adaptive execution configuration.
186    #[allow(dead_code)]
187    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
188        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
189        Self {
190            store,
191            graph_store,
192            catalog: cfg.catalog,
193            #[cfg(feature = "rdf")]
194            rdf_store: Arc::new(RdfStore::new()),
195            transaction_manager: cfg.transaction_manager,
196            query_cache: cfg.query_cache,
197            current_transaction: parking_lot::Mutex::new(None),
198            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
199            db_read_only: cfg.read_only,
200            auto_commit: true,
201            adaptive_config: cfg.adaptive_config,
202            factorized_execution: cfg.factorized_execution,
203            graph_model: cfg.graph_model,
204            query_timeout: cfg.query_timeout,
205            commit_counter: cfg.commit_counter,
206            gc_interval: cfg.gc_interval,
207            transaction_start_node_count: AtomicUsize::new(0),
208            transaction_start_edge_count: AtomicUsize::new(0),
209            #[cfg(feature = "wal")]
210            wal: None,
211            #[cfg(feature = "wal")]
212            wal_graph_context: None,
213            #[cfg(feature = "cdc")]
214            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
215            current_graph: parking_lot::Mutex::new(None),
216            current_schema: parking_lot::Mutex::new(None),
217            time_zone: parking_lot::Mutex::new(None),
218            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
219            viewing_epoch_override: parking_lot::Mutex::new(None),
220            savepoints: parking_lot::Mutex::new(Vec::new()),
221            transaction_nesting_depth: parking_lot::Mutex::new(0),
222            touched_graphs: parking_lot::Mutex::new(Vec::new()),
223            #[cfg(feature = "metrics")]
224            metrics: None,
225            #[cfg(feature = "metrics")]
226            tx_start_time: parking_lot::Mutex::new(None),
227        }
228    }
229
230    /// Sets the WAL for this session (shared with the database).
231    ///
232    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
233    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
234    #[cfg(feature = "wal")]
235    pub(crate) fn set_wal(
236        &mut self,
237        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
238        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
239    ) {
240        // Wrap the graph store so query-engine mutations are WAL-logged
241        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
242            Arc::clone(&self.store),
243            Arc::clone(&wal),
244            Arc::clone(&wal_graph_context),
245        ));
246        self.wal = Some(wal);
247        self.wal_graph_context = Some(wal_graph_context);
248    }
249
250    /// Sets the CDC log for this session (shared with the database).
251    #[cfg(feature = "cdc")]
252    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
253        self.cdc_log = cdc_log;
254    }
255
256    /// Sets the metrics registry for this session (shared with the database).
257    #[cfg(feature = "metrics")]
258    pub(crate) fn set_metrics(&mut self, metrics: Arc<crate::metrics::MetricsRegistry>) {
259        self.metrics = Some(metrics);
260    }
261
262    /// Creates a session backed by an external graph store.
263    ///
264    /// The external store handles all data operations. Transaction management
265    /// (begin/commit/rollback) is not supported for external stores.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the internal arena allocation fails (out of memory).
270    pub(crate) fn with_external_store(
271        store: Arc<dyn GraphStoreMut>,
272        cfg: SessionConfig,
273    ) -> Result<Self> {
274        Ok(Self {
275            store: Arc::new(LpgStore::new()?),
276            graph_store: store,
277            catalog: cfg.catalog,
278            #[cfg(feature = "rdf")]
279            rdf_store: Arc::new(RdfStore::new()),
280            transaction_manager: cfg.transaction_manager,
281            query_cache: cfg.query_cache,
282            current_transaction: parking_lot::Mutex::new(None),
283            read_only_tx: parking_lot::Mutex::new(cfg.read_only),
284            db_read_only: cfg.read_only,
285            auto_commit: true,
286            adaptive_config: cfg.adaptive_config,
287            factorized_execution: cfg.factorized_execution,
288            graph_model: cfg.graph_model,
289            query_timeout: cfg.query_timeout,
290            commit_counter: cfg.commit_counter,
291            gc_interval: cfg.gc_interval,
292            transaction_start_node_count: AtomicUsize::new(0),
293            transaction_start_edge_count: AtomicUsize::new(0),
294            #[cfg(feature = "wal")]
295            wal: None,
296            #[cfg(feature = "wal")]
297            wal_graph_context: None,
298            #[cfg(feature = "cdc")]
299            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
300            current_graph: parking_lot::Mutex::new(None),
301            current_schema: parking_lot::Mutex::new(None),
302            time_zone: parking_lot::Mutex::new(None),
303            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
304            viewing_epoch_override: parking_lot::Mutex::new(None),
305            savepoints: parking_lot::Mutex::new(Vec::new()),
306            transaction_nesting_depth: parking_lot::Mutex::new(0),
307            touched_graphs: parking_lot::Mutex::new(Vec::new()),
308            #[cfg(feature = "metrics")]
309            metrics: None,
310            #[cfg(feature = "metrics")]
311            tx_start_time: parking_lot::Mutex::new(None),
312        })
313    }
314
315    /// Returns the graph model this session operates on.
316    #[must_use]
317    pub fn graph_model(&self) -> GraphModel {
318        self.graph_model
319    }
320
321    // === Session State Management ===
322
323    /// Sets the current graph for this session (USE GRAPH).
324    pub fn use_graph(&self, name: &str) {
325        *self.current_graph.lock() = Some(name.to_string());
326    }
327
328    /// Returns the current graph name, if any.
329    #[must_use]
330    pub fn current_graph(&self) -> Option<String> {
331        self.current_graph.lock().clone()
332    }
333
334    /// Sets the current schema for this session (SESSION SET SCHEMA).
335    ///
336    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
337    pub fn set_schema(&self, name: &str) {
338        *self.current_schema.lock() = Some(name.to_string());
339    }
340
341    /// Returns the current schema name, if any.
342    ///
343    /// `None` means "not set", which resolves to the default schema.
344    #[must_use]
345    pub fn current_schema(&self) -> Option<String> {
346        self.current_schema.lock().clone()
347    }
348
349    /// Computes the effective storage key for a graph, accounting for schema context.
350    ///
351    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
352    /// Uses `/` as separator since it is invalid in GQL identifiers.
353    fn effective_graph_key(&self, graph_name: &str) -> String {
354        let schema = self.current_schema.lock().clone();
355        match schema {
356            Some(s) => format!("{s}/{graph_name}"),
357            None => graph_name.to_string(),
358        }
359    }
360
361    /// Computes the effective storage key for a type, accounting for schema context.
362    ///
363    /// Mirrors `effective_graph_key()`: types resolve relative to the current schema.
364    fn effective_type_key(&self, type_name: &str) -> String {
365        let schema = self.current_schema.lock().clone();
366        match schema {
367            Some(s) => format!("{s}/{type_name}"),
368            None => type_name.to_string(),
369        }
370    }
371
372    /// Returns the effective storage key for the current graph, accounting for schema.
373    ///
374    /// Combines `current_schema` and `current_graph` into a flat lookup key.
375    fn active_graph_storage_key(&self) -> Option<String> {
376        let graph = self.current_graph.lock().clone();
377        let schema = self.current_schema.lock().clone();
378        match (schema, graph) {
379            (_, None) => None,
380            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
381            (None, Some(name)) => Some(name),
382            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
383        }
384    }
385
386    /// Returns the graph store for the currently active graph.
387    ///
388    /// If `current_graph` is `None` or `"default"`, returns the session's
389    /// default `graph_store` (already WAL-wrapped for the default graph).
390    /// Otherwise looks up the named graph in the root store and wraps it
391    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
392    /// graph context.
393    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
394        let key = self.active_graph_storage_key();
395        match key {
396            None => Arc::clone(&self.graph_store),
397            Some(ref name) => match self.store.graph(name) {
398                Some(named_store) => {
399                    #[cfg(feature = "wal")]
400                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
401                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
402                            named_store,
403                            Arc::clone(wal),
404                            name.clone(),
405                            Arc::clone(ctx),
406                        )) as Arc<dyn GraphStoreMut>;
407                    }
408                    named_store as Arc<dyn GraphStoreMut>
409                }
410                None => Arc::clone(&self.graph_store),
411            },
412        }
413    }
414
415    /// Returns the concrete `LpgStore` for the currently active graph.
416    ///
417    /// Used by direct CRUD methods that need the concrete store type
418    /// for versioned operations.
419    fn active_lpg_store(&self) -> Arc<LpgStore> {
420        let key = self.active_graph_storage_key();
421        match key {
422            None => Arc::clone(&self.store),
423            Some(ref name) => self
424                .store
425                .graph(name)
426                .unwrap_or_else(|| Arc::clone(&self.store)),
427        }
428    }
429
430    /// Resolves a graph name to a concrete `LpgStore`.
431    /// `None` and `"default"` resolve to the session's root store.
432    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
433        match graph_name {
434            None => Arc::clone(&self.store),
435            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
436            Some(name) => self
437                .store
438                .graph(name)
439                .unwrap_or_else(|| Arc::clone(&self.store)),
440        }
441    }
442
443    /// Records the current graph as "touched" if a transaction is active.
444    ///
445    /// Uses the full storage key (schema/graph) so that commit/rollback
446    /// can resolve the correct store via `resolve_store`.
447    fn track_graph_touch(&self) {
448        if self.current_transaction.lock().is_some() {
449            let key = self.active_graph_storage_key();
450            let mut touched = self.touched_graphs.lock();
451            if !touched.contains(&key) {
452                touched.push(key);
453            }
454        }
455    }
456
457    /// Sets the session time zone.
458    pub fn set_time_zone(&self, tz: &str) {
459        *self.time_zone.lock() = Some(tz.to_string());
460    }
461
462    /// Returns the session time zone, if set.
463    #[must_use]
464    pub fn time_zone(&self) -> Option<String> {
465        self.time_zone.lock().clone()
466    }
467
468    /// Sets a session parameter.
469    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
470        self.session_params.lock().insert(key.to_string(), value);
471    }
472
473    /// Gets a session parameter by cloning it.
474    #[must_use]
475    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
476        self.session_params.lock().get(key).cloned()
477    }
478
479    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
480    pub fn reset_session(&self) {
481        *self.current_schema.lock() = None;
482        *self.current_graph.lock() = None;
483        *self.time_zone.lock() = None;
484        self.session_params.lock().clear();
485        *self.viewing_epoch_override.lock() = None;
486    }
487
488    /// Resets only the session schema (Section 7.2 GR1).
489    pub fn reset_schema(&self) {
490        *self.current_schema.lock() = None;
491    }
492
493    /// Resets only the session graph (Section 7.2 GR2).
494    pub fn reset_graph(&self) {
495        *self.current_graph.lock() = None;
496    }
497
498    /// Resets only the session time zone (Section 7.2 GR3).
499    pub fn reset_time_zone(&self) {
500        *self.time_zone.lock() = None;
501    }
502
503    /// Resets only session parameters (Section 7.2 GR4).
504    pub fn reset_parameters(&self) {
505        self.session_params.lock().clear();
506    }
507
508    // --- Time-travel API ---
509
510    /// Sets a viewing epoch override for time-travel queries.
511    ///
512    /// While set, all queries on this session see the database as it existed
513    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
514    /// to return to normal behavior.
515    pub fn set_viewing_epoch(&self, epoch: EpochId) {
516        *self.viewing_epoch_override.lock() = Some(epoch);
517    }
518
519    /// Clears the viewing epoch override, returning to normal behavior.
520    pub fn clear_viewing_epoch(&self) {
521        *self.viewing_epoch_override.lock() = None;
522    }
523
524    /// Returns the current viewing epoch override, if any.
525    #[must_use]
526    pub fn viewing_epoch(&self) -> Option<EpochId> {
527        *self.viewing_epoch_override.lock()
528    }
529
530    /// Returns all versions of a node with their creation/deletion epochs.
531    ///
532    /// Properties and labels reflect the current state (not versioned per-epoch).
533    #[must_use]
534    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
535        self.active_lpg_store().get_node_history(id)
536    }
537
538    /// Returns all versions of an edge with their creation/deletion epochs.
539    ///
540    /// Properties reflect the current state (not versioned per-epoch).
541    #[must_use]
542    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
543        self.active_lpg_store().get_edge_history(id)
544    }
545
546    /// Checks that the session's graph model supports LPG operations.
547    fn require_lpg(&self, language: &str) -> Result<()> {
548        if self.graph_model == GraphModel::Rdf {
549            return Err(grafeo_common::utils::error::Error::Internal(format!(
550                "This is an RDF database. {language} queries require an LPG database."
551            )));
552        }
553        Ok(())
554    }
555
556    /// Executes a session or transaction command, returning an empty result.
557    #[cfg(feature = "gql")]
558    fn execute_session_command(
559        &self,
560        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
561    ) -> Result<QueryResult> {
562        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
563        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
564
565        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
566        if *self.read_only_tx.lock() {
567            match &cmd {
568                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
569                    return Err(Error::Transaction(
570                        grafeo_common::utils::error::TransactionError::ReadOnly,
571                    ));
572                }
573                _ => {} // Session state + transaction control allowed
574            }
575        }
576
577        match cmd {
578            SessionCommand::CreateGraph {
579                name,
580                if_not_exists,
581                typed,
582                like_graph,
583                copy_of,
584                open: _,
585            } => {
586                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
587                let storage_key = self.effective_graph_key(&name);
588
589                // Validate source graph exists for LIKE / AS COPY OF
590                if let Some(ref src) = like_graph {
591                    let src_key = self.effective_graph_key(src);
592                    if self.store.graph(&src_key).is_none() {
593                        return Err(Error::Query(QueryError::new(
594                            QueryErrorKind::Semantic,
595                            format!("Source graph '{src}' does not exist"),
596                        )));
597                    }
598                }
599                if let Some(ref src) = copy_of {
600                    let src_key = self.effective_graph_key(src);
601                    if self.store.graph(&src_key).is_none() {
602                        return Err(Error::Query(QueryError::new(
603                            QueryErrorKind::Semantic,
604                            format!("Source graph '{src}' does not exist"),
605                        )));
606                    }
607                }
608
609                let created = self
610                    .store
611                    .create_graph(&storage_key)
612                    .map_err(|e| Error::Internal(e.to_string()))?;
613                if !created && !if_not_exists {
614                    return Err(Error::Query(QueryError::new(
615                        QueryErrorKind::Semantic,
616                        format!("Graph '{name}' already exists"),
617                    )));
618                }
619                if created {
620                    #[cfg(feature = "wal")]
621                    self.log_schema_wal(
622                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
623                            name: storage_key.clone(),
624                        },
625                    );
626                }
627
628                // AS COPY OF: copy data from source graph
629                if let Some(ref src) = copy_of {
630                    let src_key = self.effective_graph_key(src);
631                    self.store
632                        .copy_graph(Some(&src_key), Some(&storage_key))
633                        .map_err(|e| Error::Internal(e.to_string()))?;
634                }
635
636                // Bind to graph type if specified.
637                // If the parser produced a '/' in the name it is already a qualified
638                // "schema/type" key; otherwise resolve against the current schema.
639                if let Some(type_name) = typed
640                    && let Err(e) = self.catalog.bind_graph_type(
641                        &storage_key,
642                        if type_name.contains('/') {
643                            type_name.clone()
644                        } else {
645                            self.effective_type_key(&type_name)
646                        },
647                    )
648                {
649                    return Err(Error::Query(QueryError::new(
650                        QueryErrorKind::Semantic,
651                        e.to_string(),
652                    )));
653                }
654
655                // LIKE: copy graph type binding from source
656                if let Some(ref src) = like_graph {
657                    let src_key = self.effective_graph_key(src);
658                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
659                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
660                    }
661                }
662
663                Ok(QueryResult::empty())
664            }
665            SessionCommand::DropGraph { name, if_exists } => {
666                let storage_key = self.effective_graph_key(&name);
667                let dropped = self.store.drop_graph(&storage_key);
668                if !dropped && !if_exists {
669                    return Err(Error::Query(QueryError::new(
670                        QueryErrorKind::Semantic,
671                        format!("Graph '{name}' does not exist"),
672                    )));
673                }
674                if dropped {
675                    #[cfg(feature = "wal")]
676                    self.log_schema_wal(
677                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
678                            name: storage_key.clone(),
679                        },
680                    );
681                    // If this session was using the dropped graph, reset to default
682                    let mut current = self.current_graph.lock();
683                    if current
684                        .as_deref()
685                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
686                    {
687                        *current = None;
688                    }
689                }
690                Ok(QueryResult::empty())
691            }
692            SessionCommand::UseGraph(name) => {
693                // Verify graph exists (resolve within current schema)
694                let effective_key = self.effective_graph_key(&name);
695                if !name.eq_ignore_ascii_case("default")
696                    && self.store.graph(&effective_key).is_none()
697                {
698                    return Err(Error::Query(QueryError::new(
699                        QueryErrorKind::Semantic,
700                        format!("Graph '{name}' does not exist"),
701                    )));
702                }
703                self.use_graph(&name);
704                // Track the new graph if in a transaction
705                self.track_graph_touch();
706                Ok(QueryResult::empty())
707            }
708            SessionCommand::SessionSetGraph(name) => {
709                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
710                let effective_key = self.effective_graph_key(&name);
711                if !name.eq_ignore_ascii_case("default")
712                    && self.store.graph(&effective_key).is_none()
713                {
714                    return Err(Error::Query(QueryError::new(
715                        QueryErrorKind::Semantic,
716                        format!("Graph '{name}' does not exist"),
717                    )));
718                }
719                self.use_graph(&name);
720                // Track the new graph if in a transaction
721                self.track_graph_touch();
722                Ok(QueryResult::empty())
723            }
724            SessionCommand::SessionSetSchema(name) => {
725                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
726                if !self.catalog.schema_exists(&name) {
727                    return Err(Error::Query(QueryError::new(
728                        QueryErrorKind::Semantic,
729                        format!("Schema '{name}' does not exist"),
730                    )));
731                }
732                self.set_schema(&name);
733                Ok(QueryResult::empty())
734            }
735            SessionCommand::SessionSetTimeZone(tz) => {
736                self.set_time_zone(&tz);
737                Ok(QueryResult::empty())
738            }
739            SessionCommand::SessionSetParameter(key, expr) => {
740                if key.eq_ignore_ascii_case("viewing_epoch") {
741                    match Self::eval_integer_literal(&expr) {
742                        Some(n) if n >= 0 => {
743                            self.set_viewing_epoch(EpochId::new(n as u64));
744                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
745                        }
746                        _ => Err(Error::Query(QueryError::new(
747                            QueryErrorKind::Semantic,
748                            "viewing_epoch must be a non-negative integer literal",
749                        ))),
750                    }
751                } else {
752                    // For now, store parameter name with Null value.
753                    // Full expression evaluation would require building and executing a plan.
754                    self.set_parameter(&key, Value::Null);
755                    Ok(QueryResult::empty())
756                }
757            }
758            SessionCommand::SessionReset(target) => {
759                use grafeo_adapters::query::gql::ast::SessionResetTarget;
760                match target {
761                    SessionResetTarget::All => self.reset_session(),
762                    SessionResetTarget::Schema => self.reset_schema(),
763                    SessionResetTarget::Graph => self.reset_graph(),
764                    SessionResetTarget::TimeZone => self.reset_time_zone(),
765                    SessionResetTarget::Parameters => self.reset_parameters(),
766                }
767                Ok(QueryResult::empty())
768            }
769            SessionCommand::SessionClose => {
770                self.reset_session();
771                Ok(QueryResult::empty())
772            }
773            SessionCommand::StartTransaction {
774                read_only,
775                isolation_level,
776            } => {
777                let engine_level = isolation_level.map(|l| match l {
778                    TransactionIsolationLevel::ReadCommitted => {
779                        crate::transaction::IsolationLevel::ReadCommitted
780                    }
781                    TransactionIsolationLevel::SnapshotIsolation => {
782                        crate::transaction::IsolationLevel::SnapshotIsolation
783                    }
784                    TransactionIsolationLevel::Serializable => {
785                        crate::transaction::IsolationLevel::Serializable
786                    }
787                });
788                self.begin_transaction_inner(read_only, engine_level)?;
789                Ok(QueryResult::status("Transaction started"))
790            }
791            SessionCommand::Commit => {
792                self.commit_inner()?;
793                Ok(QueryResult::status("Transaction committed"))
794            }
795            SessionCommand::Rollback => {
796                self.rollback_inner()?;
797                Ok(QueryResult::status("Transaction rolled back"))
798            }
799            SessionCommand::Savepoint(name) => {
800                self.savepoint(&name)?;
801                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
802            }
803            SessionCommand::RollbackToSavepoint(name) => {
804                self.rollback_to_savepoint(&name)?;
805                Ok(QueryResult::status(format!(
806                    "Rolled back to savepoint '{name}'"
807                )))
808            }
809            SessionCommand::ReleaseSavepoint(name) => {
810                self.release_savepoint(&name)?;
811                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
812            }
813        }
814    }
815
816    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
817    #[cfg(feature = "wal")]
818    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
819        if let Some(ref wal) = self.wal
820            && let Err(e) = wal.log(record)
821        {
822            grafeo_warn!("Failed to log schema change to WAL: {}", e);
823        }
824    }
825
826    /// Executes a schema DDL command, returning a status result.
827    #[cfg(feature = "gql")]
828    fn execute_schema_command(
829        &self,
830        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
831    ) -> Result<QueryResult> {
832        use crate::catalog::{
833            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
834        };
835        use grafeo_adapters::query::gql::ast::SchemaStatement;
836        #[cfg(feature = "wal")]
837        use grafeo_adapters::storage::wal::WalRecord;
838        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
839
840        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
841        macro_rules! wal_log {
842            ($self:expr, $record:expr) => {
843                #[cfg(feature = "wal")]
844                $self.log_schema_wal(&$record);
845            };
846        }
847
848        let result = match cmd {
849            SchemaStatement::CreateNodeType(stmt) => {
850                let effective_name = self.effective_type_key(&stmt.name);
851                #[cfg(feature = "wal")]
852                let props_for_wal: Vec<(String, String, bool)> = stmt
853                    .properties
854                    .iter()
855                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
856                    .collect();
857                let def = NodeTypeDefinition {
858                    name: effective_name.clone(),
859                    properties: stmt
860                        .properties
861                        .iter()
862                        .map(|p| TypedProperty {
863                            name: p.name.clone(),
864                            data_type: PropertyDataType::from_type_name(&p.data_type),
865                            nullable: p.nullable,
866                            default_value: p
867                                .default_value
868                                .as_ref()
869                                .map(|s| parse_default_literal(s)),
870                        })
871                        .collect(),
872                    constraints: Vec::new(),
873                    parent_types: stmt.parent_types.clone(),
874                };
875                let result = if stmt.or_replace {
876                    let _ = self.catalog.drop_node_type(&effective_name);
877                    self.catalog.register_node_type(def)
878                } else {
879                    self.catalog.register_node_type(def)
880                };
881                match result {
882                    Ok(()) => {
883                        wal_log!(
884                            self,
885                            WalRecord::CreateNodeType {
886                                name: effective_name.clone(),
887                                properties: props_for_wal,
888                                constraints: Vec::new(),
889                            }
890                        );
891                        Ok(QueryResult::status(format!(
892                            "Created node type '{}'",
893                            stmt.name
894                        )))
895                    }
896                    Err(e) if stmt.if_not_exists => {
897                        let _ = e;
898                        Ok(QueryResult::status("No change"))
899                    }
900                    Err(e) => Err(Error::Query(QueryError::new(
901                        QueryErrorKind::Semantic,
902                        e.to_string(),
903                    ))),
904                }
905            }
906            SchemaStatement::CreateEdgeType(stmt) => {
907                let effective_name = self.effective_type_key(&stmt.name);
908                #[cfg(feature = "wal")]
909                let props_for_wal: Vec<(String, String, bool)> = stmt
910                    .properties
911                    .iter()
912                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
913                    .collect();
914                let def = EdgeTypeDefinition {
915                    name: effective_name.clone(),
916                    properties: stmt
917                        .properties
918                        .iter()
919                        .map(|p| TypedProperty {
920                            name: p.name.clone(),
921                            data_type: PropertyDataType::from_type_name(&p.data_type),
922                            nullable: p.nullable,
923                            default_value: p
924                                .default_value
925                                .as_ref()
926                                .map(|s| parse_default_literal(s)),
927                        })
928                        .collect(),
929                    constraints: Vec::new(),
930                    source_node_types: stmt.source_node_types.clone(),
931                    target_node_types: stmt.target_node_types.clone(),
932                };
933                let result = if stmt.or_replace {
934                    let _ = self.catalog.drop_edge_type_def(&effective_name);
935                    self.catalog.register_edge_type_def(def)
936                } else {
937                    self.catalog.register_edge_type_def(def)
938                };
939                match result {
940                    Ok(()) => {
941                        wal_log!(
942                            self,
943                            WalRecord::CreateEdgeType {
944                                name: effective_name.clone(),
945                                properties: props_for_wal,
946                                constraints: Vec::new(),
947                            }
948                        );
949                        Ok(QueryResult::status(format!(
950                            "Created edge type '{}'",
951                            stmt.name
952                        )))
953                    }
954                    Err(e) if stmt.if_not_exists => {
955                        let _ = e;
956                        Ok(QueryResult::status("No change"))
957                    }
958                    Err(e) => Err(Error::Query(QueryError::new(
959                        QueryErrorKind::Semantic,
960                        e.to_string(),
961                    ))),
962                }
963            }
964            SchemaStatement::CreateVectorIndex(stmt) => {
965                Self::create_vector_index_on_store(
966                    &self.active_lpg_store(),
967                    &stmt.node_label,
968                    &stmt.property,
969                    stmt.dimensions,
970                    stmt.metric.as_deref(),
971                )?;
972                wal_log!(
973                    self,
974                    WalRecord::CreateIndex {
975                        name: stmt.name.clone(),
976                        label: stmt.node_label.clone(),
977                        property: stmt.property.clone(),
978                        index_type: "vector".to_string(),
979                    }
980                );
981                Ok(QueryResult::status(format!(
982                    "Created vector index '{}'",
983                    stmt.name
984                )))
985            }
986            SchemaStatement::DropNodeType { name, if_exists } => {
987                let effective_name = self.effective_type_key(&name);
988                match self.catalog.drop_node_type(&effective_name) {
989                    Ok(()) => {
990                        wal_log!(
991                            self,
992                            WalRecord::DropNodeType {
993                                name: effective_name
994                            }
995                        );
996                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
997                    }
998                    Err(e) if if_exists => {
999                        let _ = e;
1000                        Ok(QueryResult::status("No change"))
1001                    }
1002                    Err(e) => Err(Error::Query(QueryError::new(
1003                        QueryErrorKind::Semantic,
1004                        e.to_string(),
1005                    ))),
1006                }
1007            }
1008            SchemaStatement::DropEdgeType { name, if_exists } => {
1009                let effective_name = self.effective_type_key(&name);
1010                match self.catalog.drop_edge_type_def(&effective_name) {
1011                    Ok(()) => {
1012                        wal_log!(
1013                            self,
1014                            WalRecord::DropEdgeType {
1015                                name: effective_name
1016                            }
1017                        );
1018                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
1019                    }
1020                    Err(e) if if_exists => {
1021                        let _ = e;
1022                        Ok(QueryResult::status("No change"))
1023                    }
1024                    Err(e) => Err(Error::Query(QueryError::new(
1025                        QueryErrorKind::Semantic,
1026                        e.to_string(),
1027                    ))),
1028                }
1029            }
1030            SchemaStatement::CreateIndex(stmt) => {
1031                use grafeo_adapters::query::gql::ast::IndexKind;
1032                let active = self.active_lpg_store();
1033                let index_type_str = match stmt.index_kind {
1034                    IndexKind::Property => "property",
1035                    IndexKind::BTree => "btree",
1036                    IndexKind::Text => "text",
1037                    IndexKind::Vector => "vector",
1038                };
1039                match stmt.index_kind {
1040                    IndexKind::Property | IndexKind::BTree => {
1041                        for prop in &stmt.properties {
1042                            active.create_property_index(prop);
1043                        }
1044                    }
1045                    IndexKind::Text => {
1046                        for prop in &stmt.properties {
1047                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1048                        }
1049                    }
1050                    IndexKind::Vector => {
1051                        for prop in &stmt.properties {
1052                            Self::create_vector_index_on_store(
1053                                &active,
1054                                &stmt.label,
1055                                prop,
1056                                stmt.options.dimensions,
1057                                stmt.options.metric.as_deref(),
1058                            )?;
1059                        }
1060                    }
1061                }
1062                #[cfg(feature = "wal")]
1063                for prop in &stmt.properties {
1064                    wal_log!(
1065                        self,
1066                        WalRecord::CreateIndex {
1067                            name: stmt.name.clone(),
1068                            label: stmt.label.clone(),
1069                            property: prop.clone(),
1070                            index_type: index_type_str.to_string(),
1071                        }
1072                    );
1073                }
1074                Ok(QueryResult::status(format!(
1075                    "Created {} index '{}'",
1076                    index_type_str, stmt.name
1077                )))
1078            }
1079            SchemaStatement::DropIndex { name, if_exists } => {
1080                // Try to drop property index by name
1081                let dropped = self.active_lpg_store().drop_property_index(&name);
1082                if dropped || if_exists {
1083                    if dropped {
1084                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1085                    }
1086                    Ok(QueryResult::status(if dropped {
1087                        format!("Dropped index '{name}'")
1088                    } else {
1089                        "No change".to_string()
1090                    }))
1091                } else {
1092                    Err(Error::Query(QueryError::new(
1093                        QueryErrorKind::Semantic,
1094                        format!("Index '{name}' does not exist"),
1095                    )))
1096                }
1097            }
1098            SchemaStatement::CreateConstraint(stmt) => {
1099                use crate::catalog::TypeConstraint;
1100                use grafeo_adapters::query::gql::ast::ConstraintKind;
1101                let kind_str = match stmt.constraint_kind {
1102                    ConstraintKind::Unique => "unique",
1103                    ConstraintKind::NodeKey => "node_key",
1104                    ConstraintKind::NotNull => "not_null",
1105                    ConstraintKind::Exists => "exists",
1106                };
1107                let constraint_name = stmt
1108                    .name
1109                    .clone()
1110                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1111
1112                // Register constraint in catalog type definitions
1113                match stmt.constraint_kind {
1114                    ConstraintKind::Unique => {
1115                        for prop in &stmt.properties {
1116                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1117                            let prop_id = self.catalog.get_or_create_property_key(prop);
1118                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1119                        }
1120                        let _ = self.catalog.add_constraint_to_type(
1121                            &stmt.label,
1122                            TypeConstraint::Unique(stmt.properties.clone()),
1123                        );
1124                    }
1125                    ConstraintKind::NodeKey => {
1126                        for prop in &stmt.properties {
1127                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1128                            let prop_id = self.catalog.get_or_create_property_key(prop);
1129                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1130                            let _ = self.catalog.add_required_property(label_id, prop_id);
1131                        }
1132                        let _ = self.catalog.add_constraint_to_type(
1133                            &stmt.label,
1134                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1135                        );
1136                    }
1137                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1138                        for prop in &stmt.properties {
1139                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1140                            let prop_id = self.catalog.get_or_create_property_key(prop);
1141                            let _ = self.catalog.add_required_property(label_id, prop_id);
1142                            let _ = self.catalog.add_constraint_to_type(
1143                                &stmt.label,
1144                                TypeConstraint::NotNull(prop.clone()),
1145                            );
1146                        }
1147                    }
1148                }
1149
1150                wal_log!(
1151                    self,
1152                    WalRecord::CreateConstraint {
1153                        name: constraint_name.clone(),
1154                        label: stmt.label.clone(),
1155                        properties: stmt.properties.clone(),
1156                        kind: kind_str.to_string(),
1157                    }
1158                );
1159                Ok(QueryResult::status(format!(
1160                    "Created {kind_str} constraint '{constraint_name}'"
1161                )))
1162            }
1163            SchemaStatement::DropConstraint { name, if_exists } => {
1164                let _ = if_exists;
1165                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1166                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1167            }
1168            SchemaStatement::CreateGraphType(stmt) => {
1169                use crate::catalog::GraphTypeDefinition;
1170                use grafeo_adapters::query::gql::ast::InlineElementType;
1171
1172                let effective_name = self.effective_type_key(&stmt.name);
1173
1174                // GG04: LIKE clause copies type from existing graph
1175                let (mut node_types, mut edge_types, open) =
1176                    if let Some(ref like_graph) = stmt.like_graph {
1177                        // Infer types from the graph's bound type, or use its existing types
1178                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1179                            if let Some(existing) = self
1180                                .catalog
1181                                .schema()
1182                                .and_then(|s| s.get_graph_type(&type_name))
1183                            {
1184                                (
1185                                    existing.allowed_node_types.clone(),
1186                                    existing.allowed_edge_types.clone(),
1187                                    existing.open,
1188                                )
1189                            } else {
1190                                (Vec::new(), Vec::new(), true)
1191                            }
1192                        } else {
1193                            // GG22: Infer from graph data (labels used in graph)
1194                            let nt = self.catalog.all_node_type_names();
1195                            let et = self.catalog.all_edge_type_names();
1196                            if nt.is_empty() && et.is_empty() {
1197                                (Vec::new(), Vec::new(), true)
1198                            } else {
1199                                (nt, et, false)
1200                            }
1201                        }
1202                    } else {
1203                        // Prefix element type names with schema for consistency
1204                        let nt = stmt
1205                            .node_types
1206                            .iter()
1207                            .map(|n| self.effective_type_key(n))
1208                            .collect();
1209                        let et = stmt
1210                            .edge_types
1211                            .iter()
1212                            .map(|n| self.effective_type_key(n))
1213                            .collect();
1214                        (nt, et, stmt.open)
1215                    };
1216
1217                // GG03: Register inline element types and add their names
1218                for inline in &stmt.inline_types {
1219                    match inline {
1220                        InlineElementType::Node {
1221                            name,
1222                            properties,
1223                            key_labels,
1224                            ..
1225                        } => {
1226                            let inline_effective = self.effective_type_key(name);
1227                            let def = NodeTypeDefinition {
1228                                name: inline_effective.clone(),
1229                                properties: properties
1230                                    .iter()
1231                                    .map(|p| TypedProperty {
1232                                        name: p.name.clone(),
1233                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1234                                        nullable: p.nullable,
1235                                        default_value: None,
1236                                    })
1237                                    .collect(),
1238                                constraints: Vec::new(),
1239                                parent_types: key_labels.clone(),
1240                            };
1241                            // Register or replace so inline defs override existing
1242                            self.catalog.register_or_replace_node_type(def);
1243                            #[cfg(feature = "wal")]
1244                            {
1245                                let props_for_wal: Vec<(String, String, bool)> = properties
1246                                    .iter()
1247                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1248                                    .collect();
1249                                self.log_schema_wal(&WalRecord::CreateNodeType {
1250                                    name: inline_effective.clone(),
1251                                    properties: props_for_wal,
1252                                    constraints: Vec::new(),
1253                                });
1254                            }
1255                            if !node_types.contains(&inline_effective) {
1256                                node_types.push(inline_effective);
1257                            }
1258                        }
1259                        InlineElementType::Edge {
1260                            name,
1261                            properties,
1262                            source_node_types,
1263                            target_node_types,
1264                            ..
1265                        } => {
1266                            let inline_effective = self.effective_type_key(name);
1267                            let def = EdgeTypeDefinition {
1268                                name: inline_effective.clone(),
1269                                properties: properties
1270                                    .iter()
1271                                    .map(|p| TypedProperty {
1272                                        name: p.name.clone(),
1273                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1274                                        nullable: p.nullable,
1275                                        default_value: None,
1276                                    })
1277                                    .collect(),
1278                                constraints: Vec::new(),
1279                                source_node_types: source_node_types.clone(),
1280                                target_node_types: target_node_types.clone(),
1281                            };
1282                            self.catalog.register_or_replace_edge_type_def(def);
1283                            #[cfg(feature = "wal")]
1284                            {
1285                                let props_for_wal: Vec<(String, String, bool)> = properties
1286                                    .iter()
1287                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1288                                    .collect();
1289                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1290                                    name: inline_effective.clone(),
1291                                    properties: props_for_wal,
1292                                    constraints: Vec::new(),
1293                                });
1294                            }
1295                            if !edge_types.contains(&inline_effective) {
1296                                edge_types.push(inline_effective);
1297                            }
1298                        }
1299                    }
1300                }
1301
1302                let def = GraphTypeDefinition {
1303                    name: effective_name.clone(),
1304                    allowed_node_types: node_types.clone(),
1305                    allowed_edge_types: edge_types.clone(),
1306                    open,
1307                };
1308                let result = if stmt.or_replace {
1309                    // Drop existing first, ignore error if not found
1310                    let _ = self.catalog.drop_graph_type(&effective_name);
1311                    self.catalog.register_graph_type(def)
1312                } else {
1313                    self.catalog.register_graph_type(def)
1314                };
1315                match result {
1316                    Ok(()) => {
1317                        wal_log!(
1318                            self,
1319                            WalRecord::CreateGraphType {
1320                                name: effective_name.clone(),
1321                                node_types,
1322                                edge_types,
1323                                open,
1324                            }
1325                        );
1326                        Ok(QueryResult::status(format!(
1327                            "Created graph type '{}'",
1328                            stmt.name
1329                        )))
1330                    }
1331                    Err(e) if stmt.if_not_exists => {
1332                        let _ = e;
1333                        Ok(QueryResult::status("No change"))
1334                    }
1335                    Err(e) => Err(Error::Query(QueryError::new(
1336                        QueryErrorKind::Semantic,
1337                        e.to_string(),
1338                    ))),
1339                }
1340            }
1341            SchemaStatement::DropGraphType { name, if_exists } => {
1342                let effective_name = self.effective_type_key(&name);
1343                match self.catalog.drop_graph_type(&effective_name) {
1344                    Ok(()) => {
1345                        wal_log!(
1346                            self,
1347                            WalRecord::DropGraphType {
1348                                name: effective_name
1349                            }
1350                        );
1351                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1352                    }
1353                    Err(e) if if_exists => {
1354                        let _ = e;
1355                        Ok(QueryResult::status("No change"))
1356                    }
1357                    Err(e) => Err(Error::Query(QueryError::new(
1358                        QueryErrorKind::Semantic,
1359                        e.to_string(),
1360                    ))),
1361                }
1362            }
1363            SchemaStatement::CreateSchema {
1364                name,
1365                if_not_exists,
1366            } => match self.catalog.register_schema_namespace(name.clone()) {
1367                Ok(()) => {
1368                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1369                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1370                }
1371                Err(e) if if_not_exists => {
1372                    let _ = e;
1373                    Ok(QueryResult::status("No change"))
1374                }
1375                Err(e) => Err(Error::Query(QueryError::new(
1376                    QueryErrorKind::Semantic,
1377                    e.to_string(),
1378                ))),
1379            },
1380            SchemaStatement::DropSchema { name, if_exists } => {
1381                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1382                let prefix = format!("{name}/");
1383                let has_graphs = self
1384                    .store
1385                    .graph_names()
1386                    .iter()
1387                    .any(|g| g.starts_with(&prefix));
1388                let has_types = self
1389                    .catalog
1390                    .all_node_type_names()
1391                    .iter()
1392                    .any(|n| n.starts_with(&prefix))
1393                    || self
1394                        .catalog
1395                        .all_edge_type_names()
1396                        .iter()
1397                        .any(|n| n.starts_with(&prefix))
1398                    || self
1399                        .catalog
1400                        .all_graph_type_names()
1401                        .iter()
1402                        .any(|n| n.starts_with(&prefix));
1403                if has_graphs || has_types {
1404                    return Err(Error::Query(QueryError::new(
1405                        QueryErrorKind::Semantic,
1406                        format!("Schema '{name}' is not empty: drop all graphs and types first"),
1407                    )));
1408                }
1409                match self.catalog.drop_schema_namespace(&name) {
1410                    Ok(()) => {
1411                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1412                        // If this session was using the dropped schema, reset it
1413                        let mut current = self.current_schema.lock();
1414                        if current
1415                            .as_deref()
1416                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1417                        {
1418                            *current = None;
1419                        }
1420                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1421                    }
1422                    Err(e) if if_exists => {
1423                        let _ = e;
1424                        Ok(QueryResult::status("No change"))
1425                    }
1426                    Err(e) => Err(Error::Query(QueryError::new(
1427                        QueryErrorKind::Semantic,
1428                        e.to_string(),
1429                    ))),
1430                }
1431            }
1432            SchemaStatement::AlterNodeType(stmt) => {
1433                use grafeo_adapters::query::gql::ast::TypeAlteration;
1434                let effective_name = self.effective_type_key(&stmt.name);
1435                let mut wal_alts = Vec::new();
1436                for alt in &stmt.alterations {
1437                    match alt {
1438                        TypeAlteration::AddProperty(prop) => {
1439                            let typed = TypedProperty {
1440                                name: prop.name.clone(),
1441                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1442                                nullable: prop.nullable,
1443                                default_value: prop
1444                                    .default_value
1445                                    .as_ref()
1446                                    .map(|s| parse_default_literal(s)),
1447                            };
1448                            self.catalog
1449                                .alter_node_type_add_property(&effective_name, typed)
1450                                .map_err(|e| {
1451                                    Error::Query(QueryError::new(
1452                                        QueryErrorKind::Semantic,
1453                                        e.to_string(),
1454                                    ))
1455                                })?;
1456                            wal_alts.push((
1457                                "add".to_string(),
1458                                prop.name.clone(),
1459                                prop.data_type.clone(),
1460                                prop.nullable,
1461                            ));
1462                        }
1463                        TypeAlteration::DropProperty(name) => {
1464                            self.catalog
1465                                .alter_node_type_drop_property(&effective_name, name)
1466                                .map_err(|e| {
1467                                    Error::Query(QueryError::new(
1468                                        QueryErrorKind::Semantic,
1469                                        e.to_string(),
1470                                    ))
1471                                })?;
1472                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1473                        }
1474                    }
1475                }
1476                wal_log!(
1477                    self,
1478                    WalRecord::AlterNodeType {
1479                        name: effective_name,
1480                        alterations: wal_alts,
1481                    }
1482                );
1483                Ok(QueryResult::status(format!(
1484                    "Altered node type '{}'",
1485                    stmt.name
1486                )))
1487            }
1488            SchemaStatement::AlterEdgeType(stmt) => {
1489                use grafeo_adapters::query::gql::ast::TypeAlteration;
1490                let effective_name = self.effective_type_key(&stmt.name);
1491                let mut wal_alts = Vec::new();
1492                for alt in &stmt.alterations {
1493                    match alt {
1494                        TypeAlteration::AddProperty(prop) => {
1495                            let typed = TypedProperty {
1496                                name: prop.name.clone(),
1497                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1498                                nullable: prop.nullable,
1499                                default_value: prop
1500                                    .default_value
1501                                    .as_ref()
1502                                    .map(|s| parse_default_literal(s)),
1503                            };
1504                            self.catalog
1505                                .alter_edge_type_add_property(&effective_name, typed)
1506                                .map_err(|e| {
1507                                    Error::Query(QueryError::new(
1508                                        QueryErrorKind::Semantic,
1509                                        e.to_string(),
1510                                    ))
1511                                })?;
1512                            wal_alts.push((
1513                                "add".to_string(),
1514                                prop.name.clone(),
1515                                prop.data_type.clone(),
1516                                prop.nullable,
1517                            ));
1518                        }
1519                        TypeAlteration::DropProperty(name) => {
1520                            self.catalog
1521                                .alter_edge_type_drop_property(&effective_name, name)
1522                                .map_err(|e| {
1523                                    Error::Query(QueryError::new(
1524                                        QueryErrorKind::Semantic,
1525                                        e.to_string(),
1526                                    ))
1527                                })?;
1528                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1529                        }
1530                    }
1531                }
1532                wal_log!(
1533                    self,
1534                    WalRecord::AlterEdgeType {
1535                        name: effective_name,
1536                        alterations: wal_alts,
1537                    }
1538                );
1539                Ok(QueryResult::status(format!(
1540                    "Altered edge type '{}'",
1541                    stmt.name
1542                )))
1543            }
1544            SchemaStatement::AlterGraphType(stmt) => {
1545                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1546                let effective_name = self.effective_type_key(&stmt.name);
1547                let mut wal_alts = Vec::new();
1548                for alt in &stmt.alterations {
1549                    match alt {
1550                        GraphTypeAlteration::AddNodeType(name) => {
1551                            self.catalog
1552                                .alter_graph_type_add_node_type(&effective_name, name.clone())
1553                                .map_err(|e| {
1554                                    Error::Query(QueryError::new(
1555                                        QueryErrorKind::Semantic,
1556                                        e.to_string(),
1557                                    ))
1558                                })?;
1559                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1560                        }
1561                        GraphTypeAlteration::DropNodeType(name) => {
1562                            self.catalog
1563                                .alter_graph_type_drop_node_type(&effective_name, name)
1564                                .map_err(|e| {
1565                                    Error::Query(QueryError::new(
1566                                        QueryErrorKind::Semantic,
1567                                        e.to_string(),
1568                                    ))
1569                                })?;
1570                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1571                        }
1572                        GraphTypeAlteration::AddEdgeType(name) => {
1573                            self.catalog
1574                                .alter_graph_type_add_edge_type(&effective_name, name.clone())
1575                                .map_err(|e| {
1576                                    Error::Query(QueryError::new(
1577                                        QueryErrorKind::Semantic,
1578                                        e.to_string(),
1579                                    ))
1580                                })?;
1581                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1582                        }
1583                        GraphTypeAlteration::DropEdgeType(name) => {
1584                            self.catalog
1585                                .alter_graph_type_drop_edge_type(&effective_name, name)
1586                                .map_err(|e| {
1587                                    Error::Query(QueryError::new(
1588                                        QueryErrorKind::Semantic,
1589                                        e.to_string(),
1590                                    ))
1591                                })?;
1592                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1593                        }
1594                    }
1595                }
1596                wal_log!(
1597                    self,
1598                    WalRecord::AlterGraphType {
1599                        name: effective_name,
1600                        alterations: wal_alts,
1601                    }
1602                );
1603                Ok(QueryResult::status(format!(
1604                    "Altered graph type '{}'",
1605                    stmt.name
1606                )))
1607            }
1608            SchemaStatement::CreateProcedure(stmt) => {
1609                use crate::catalog::ProcedureDefinition;
1610
1611                let def = ProcedureDefinition {
1612                    name: stmt.name.clone(),
1613                    params: stmt
1614                        .params
1615                        .iter()
1616                        .map(|p| (p.name.clone(), p.param_type.clone()))
1617                        .collect(),
1618                    returns: stmt
1619                        .returns
1620                        .iter()
1621                        .map(|r| (r.name.clone(), r.return_type.clone()))
1622                        .collect(),
1623                    body: stmt.body.clone(),
1624                };
1625
1626                if stmt.or_replace {
1627                    self.catalog.replace_procedure(def).map_err(|e| {
1628                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1629                    })?;
1630                } else {
1631                    match self.catalog.register_procedure(def) {
1632                        Ok(()) => {}
1633                        Err(_) if stmt.if_not_exists => {
1634                            return Ok(QueryResult::empty());
1635                        }
1636                        Err(e) => {
1637                            return Err(Error::Query(QueryError::new(
1638                                QueryErrorKind::Semantic,
1639                                e.to_string(),
1640                            )));
1641                        }
1642                    }
1643                }
1644
1645                wal_log!(
1646                    self,
1647                    WalRecord::CreateProcedure {
1648                        name: stmt.name.clone(),
1649                        params: stmt
1650                            .params
1651                            .iter()
1652                            .map(|p| (p.name.clone(), p.param_type.clone()))
1653                            .collect(),
1654                        returns: stmt
1655                            .returns
1656                            .iter()
1657                            .map(|r| (r.name.clone(), r.return_type.clone()))
1658                            .collect(),
1659                        body: stmt.body,
1660                    }
1661                );
1662                Ok(QueryResult::status(format!(
1663                    "Created procedure '{}'",
1664                    stmt.name
1665                )))
1666            }
1667            SchemaStatement::DropProcedure { name, if_exists } => {
1668                match self.catalog.drop_procedure(&name) {
1669                    Ok(()) => {}
1670                    Err(_) if if_exists => {
1671                        return Ok(QueryResult::empty());
1672                    }
1673                    Err(e) => {
1674                        return Err(Error::Query(QueryError::new(
1675                            QueryErrorKind::Semantic,
1676                            e.to_string(),
1677                        )));
1678                    }
1679                }
1680                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1681                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1682            }
1683            SchemaStatement::ShowIndexes => {
1684                return self.execute_show_indexes();
1685            }
1686            SchemaStatement::ShowConstraints => {
1687                return self.execute_show_constraints();
1688            }
1689            SchemaStatement::ShowNodeTypes => {
1690                return self.execute_show_node_types();
1691            }
1692            SchemaStatement::ShowEdgeTypes => {
1693                return self.execute_show_edge_types();
1694            }
1695            SchemaStatement::ShowGraphTypes => {
1696                return self.execute_show_graph_types();
1697            }
1698            SchemaStatement::ShowGraphType(name) => {
1699                return self.execute_show_graph_type(&name);
1700            }
1701            SchemaStatement::ShowCurrentGraphType => {
1702                return self.execute_show_current_graph_type();
1703            }
1704            SchemaStatement::ShowGraphs => {
1705                return self.execute_show_graphs();
1706            }
1707            SchemaStatement::ShowSchemas => {
1708                return self.execute_show_schemas();
1709            }
1710        };
1711
1712        // Invalidate all cached query plans after any successful DDL change.
1713        // DDL is rare, so clearing the entire cache is cheap and correct.
1714        if result.is_ok() {
1715            self.query_cache.clear();
1716        }
1717
1718        result
1719    }
1720
1721    /// Creates a vector index on the store by scanning existing nodes.
1722    #[cfg(all(feature = "gql", feature = "vector-index"))]
1723    fn create_vector_index_on_store(
1724        store: &LpgStore,
1725        label: &str,
1726        property: &str,
1727        dimensions: Option<usize>,
1728        metric: Option<&str>,
1729    ) -> Result<()> {
1730        use grafeo_common::types::{PropertyKey, Value};
1731        use grafeo_common::utils::error::Error;
1732        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1733
1734        let metric = match metric {
1735            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1736                Error::Internal(format!(
1737                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1738                ))
1739            })?,
1740            None => DistanceMetric::Cosine,
1741        };
1742
1743        let prop_key = PropertyKey::new(property);
1744        let mut found_dims: Option<usize> = dimensions;
1745        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1746
1747        for node in store.nodes_with_label(label) {
1748            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1749                if let Some(expected) = found_dims {
1750                    if v.len() != expected {
1751                        return Err(Error::Internal(format!(
1752                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1753                            v.len(),
1754                            node.id.0
1755                        )));
1756                    }
1757                } else {
1758                    found_dims = Some(v.len());
1759                }
1760                vectors.push((node.id, v.to_vec()));
1761            }
1762        }
1763
1764        let Some(dims) = found_dims else {
1765            return Err(Error::Internal(format!(
1766                "No vector properties found on :{label}({property}) and no dimensions specified"
1767            )));
1768        };
1769
1770        let config = HnswConfig::new(dims, metric);
1771        let index = HnswIndex::with_capacity(config, vectors.len());
1772        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1773        for (node_id, vec) in &vectors {
1774            index.insert(*node_id, vec, &accessor);
1775        }
1776
1777        store.add_vector_index(label, property, Arc::new(index));
1778        Ok(())
1779    }
1780
1781    /// Stub for when vector-index feature is not enabled.
1782    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1783    fn create_vector_index_on_store(
1784        _store: &LpgStore,
1785        _label: &str,
1786        _property: &str,
1787        _dimensions: Option<usize>,
1788        _metric: Option<&str>,
1789    ) -> Result<()> {
1790        Err(grafeo_common::utils::error::Error::Internal(
1791            "Vector index support requires the 'vector-index' feature".to_string(),
1792        ))
1793    }
1794
1795    /// Creates a text index on the store by scanning existing nodes.
1796    #[cfg(all(feature = "gql", feature = "text-index"))]
1797    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1798        use grafeo_common::types::{PropertyKey, Value};
1799        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1800
1801        let mut index = InvertedIndex::new(BM25Config::default());
1802        let prop_key = PropertyKey::new(property);
1803
1804        let nodes = store.nodes_by_label(label);
1805        for node_id in nodes {
1806            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1807                index.insert(node_id, text.as_str());
1808            }
1809        }
1810
1811        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1812        Ok(())
1813    }
1814
1815    /// Stub for when text-index feature is not enabled.
1816    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1817    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1818        Err(grafeo_common::utils::error::Error::Internal(
1819            "Text index support requires the 'text-index' feature".to_string(),
1820        ))
1821    }
1822
1823    /// Returns a table of all indexes from the catalog.
1824    fn execute_show_indexes(&self) -> Result<QueryResult> {
1825        let indexes = self.catalog.all_indexes();
1826        let columns = vec![
1827            "name".to_string(),
1828            "type".to_string(),
1829            "label".to_string(),
1830            "property".to_string(),
1831        ];
1832        let rows: Vec<Vec<Value>> = indexes
1833            .into_iter()
1834            .map(|def| {
1835                let label_name = self
1836                    .catalog
1837                    .get_label_name(def.label)
1838                    .unwrap_or_else(|| "?".into());
1839                let prop_name = self
1840                    .catalog
1841                    .get_property_key_name(def.property_key)
1842                    .unwrap_or_else(|| "?".into());
1843                vec![
1844                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1845                    Value::from(format!("{:?}", def.index_type)),
1846                    Value::from(&*label_name),
1847                    Value::from(&*prop_name),
1848                ]
1849            })
1850            .collect();
1851        Ok(QueryResult {
1852            columns,
1853            column_types: Vec::new(),
1854            rows,
1855            ..QueryResult::empty()
1856        })
1857    }
1858
1859    /// Returns a table of all constraints (currently metadata-only).
1860    fn execute_show_constraints(&self) -> Result<QueryResult> {
1861        // Constraints are tracked in WAL but not yet in a queryable catalog.
1862        // Return an empty table with the expected schema.
1863        Ok(QueryResult {
1864            columns: vec![
1865                "name".to_string(),
1866                "type".to_string(),
1867                "label".to_string(),
1868                "properties".to_string(),
1869            ],
1870            column_types: Vec::new(),
1871            rows: Vec::new(),
1872            ..QueryResult::empty()
1873        })
1874    }
1875
1876    /// Returns a table of all registered node types in the current schema.
1877    fn execute_show_node_types(&self) -> Result<QueryResult> {
1878        let columns = vec![
1879            "name".to_string(),
1880            "properties".to_string(),
1881            "constraints".to_string(),
1882            "parents".to_string(),
1883        ];
1884        let schema = self.current_schema.lock().clone();
1885        let all_names = self.catalog.all_node_type_names();
1886        let type_names: Vec<String> = match &schema {
1887            Some(s) => {
1888                let prefix = format!("{s}/");
1889                all_names
1890                    .into_iter()
1891                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1892                    .collect()
1893            }
1894            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1895        };
1896        let rows: Vec<Vec<Value>> = type_names
1897            .into_iter()
1898            .filter_map(|name| {
1899                let lookup = match &schema {
1900                    Some(s) => format!("{s}/{name}"),
1901                    None => name.clone(),
1902                };
1903                let def = self.catalog.get_node_type(&lookup)?;
1904                let props: Vec<String> = def
1905                    .properties
1906                    .iter()
1907                    .map(|p| {
1908                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1909                        format!("{} {}{}", p.name, p.data_type, nullable)
1910                    })
1911                    .collect();
1912                let constraints: Vec<String> =
1913                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1914                let parents = def.parent_types.join(", ");
1915                Some(vec![
1916                    Value::from(name),
1917                    Value::from(props.join(", ")),
1918                    Value::from(constraints.join(", ")),
1919                    Value::from(parents),
1920                ])
1921            })
1922            .collect();
1923        Ok(QueryResult {
1924            columns,
1925            column_types: Vec::new(),
1926            rows,
1927            ..QueryResult::empty()
1928        })
1929    }
1930
1931    /// Returns a table of all registered edge types in the current schema.
1932    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1933        let columns = vec![
1934            "name".to_string(),
1935            "properties".to_string(),
1936            "source_types".to_string(),
1937            "target_types".to_string(),
1938        ];
1939        let schema = self.current_schema.lock().clone();
1940        let all_names = self.catalog.all_edge_type_names();
1941        let type_names: Vec<String> = match &schema {
1942            Some(s) => {
1943                let prefix = format!("{s}/");
1944                all_names
1945                    .into_iter()
1946                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1947                    .collect()
1948            }
1949            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1950        };
1951        let rows: Vec<Vec<Value>> = type_names
1952            .into_iter()
1953            .filter_map(|name| {
1954                let lookup = match &schema {
1955                    Some(s) => format!("{s}/{name}"),
1956                    None => name.clone(),
1957                };
1958                let def = self.catalog.get_edge_type_def(&lookup)?;
1959                let props: Vec<String> = def
1960                    .properties
1961                    .iter()
1962                    .map(|p| {
1963                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1964                        format!("{} {}{}", p.name, p.data_type, nullable)
1965                    })
1966                    .collect();
1967                let src = def.source_node_types.join(", ");
1968                let tgt = def.target_node_types.join(", ");
1969                Some(vec![
1970                    Value::from(name),
1971                    Value::from(props.join(", ")),
1972                    Value::from(src),
1973                    Value::from(tgt),
1974                ])
1975            })
1976            .collect();
1977        Ok(QueryResult {
1978            columns,
1979            column_types: Vec::new(),
1980            rows,
1981            ..QueryResult::empty()
1982        })
1983    }
1984
1985    /// Returns a table of all registered graph types in the current schema.
1986    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1987        let columns = vec![
1988            "name".to_string(),
1989            "open".to_string(),
1990            "node_types".to_string(),
1991            "edge_types".to_string(),
1992        ];
1993        let schema = self.current_schema.lock().clone();
1994        let all_names = self.catalog.all_graph_type_names();
1995        let type_names: Vec<String> = match &schema {
1996            Some(s) => {
1997                let prefix = format!("{s}/");
1998                all_names
1999                    .into_iter()
2000                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2001                    .collect()
2002            }
2003            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2004        };
2005        let rows: Vec<Vec<Value>> = type_names
2006            .into_iter()
2007            .filter_map(|name| {
2008                let lookup = match &schema {
2009                    Some(s) => format!("{s}/{name}"),
2010                    None => name.clone(),
2011                };
2012                let def = self.catalog.get_graph_type_def(&lookup)?;
2013                // Strip schema prefix from allowed type names for display
2014                let strip = |n: &String| -> String {
2015                    match &schema {
2016                        Some(s) => n.strip_prefix(&format!("{s}/")).unwrap_or(n).to_string(),
2017                        None => n.clone(),
2018                    }
2019                };
2020                let node_types: Vec<String> = def.allowed_node_types.iter().map(strip).collect();
2021                let edge_types: Vec<String> = def.allowed_edge_types.iter().map(strip).collect();
2022                Some(vec![
2023                    Value::from(name),
2024                    Value::from(def.open),
2025                    Value::from(node_types.join(", ")),
2026                    Value::from(edge_types.join(", ")),
2027                ])
2028            })
2029            .collect();
2030        Ok(QueryResult {
2031            columns,
2032            column_types: Vec::new(),
2033            rows,
2034            ..QueryResult::empty()
2035        })
2036    }
2037
2038    /// Returns the list of named graphs visible in the current schema context.
2039    ///
2040    /// When a session schema is set, only graphs belonging to that schema are
2041    /// shown (their compound prefix is stripped). When no schema is set, graphs
2042    /// without a schema prefix are shown (the default schema).
2043    fn execute_show_graphs(&self) -> Result<QueryResult> {
2044        let schema = self.current_schema.lock().clone();
2045        let all_names = self.store.graph_names();
2046
2047        let mut names: Vec<String> = match &schema {
2048            Some(s) => {
2049                let prefix = format!("{s}/");
2050                all_names
2051                    .into_iter()
2052                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
2053                    .collect()
2054            }
2055            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
2056        };
2057        names.sort();
2058
2059        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2060        Ok(QueryResult {
2061            columns: vec!["name".to_string()],
2062            column_types: Vec::new(),
2063            rows,
2064            ..QueryResult::empty()
2065        })
2066    }
2067
2068    /// Returns the list of all schema namespaces.
2069    fn execute_show_schemas(&self) -> Result<QueryResult> {
2070        let mut names = self.catalog.schema_names();
2071        names.sort();
2072        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
2073        Ok(QueryResult {
2074            columns: vec!["name".to_string()],
2075            column_types: Vec::new(),
2076            rows,
2077            ..QueryResult::empty()
2078        })
2079    }
2080
2081    /// Returns detailed info for a specific graph type.
2082    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
2083        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
2084
2085        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
2086            Error::Query(QueryError::new(
2087                QueryErrorKind::Semantic,
2088                format!("Graph type '{name}' not found"),
2089            ))
2090        })?;
2091
2092        let columns = vec![
2093            "name".to_string(),
2094            "open".to_string(),
2095            "node_types".to_string(),
2096            "edge_types".to_string(),
2097        ];
2098        let rows = vec![vec![
2099            Value::from(def.name),
2100            Value::from(def.open),
2101            Value::from(def.allowed_node_types.join(", ")),
2102            Value::from(def.allowed_edge_types.join(", ")),
2103        ]];
2104        Ok(QueryResult {
2105            columns,
2106            column_types: Vec::new(),
2107            rows,
2108            ..QueryResult::empty()
2109        })
2110    }
2111
2112    /// Returns the graph type bound to the current graph.
2113    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2114        let graph_name = self
2115            .current_graph()
2116            .unwrap_or_else(|| "default".to_string());
2117        let columns = vec![
2118            "graph".to_string(),
2119            "graph_type".to_string(),
2120            "open".to_string(),
2121            "node_types".to_string(),
2122            "edge_types".to_string(),
2123        ];
2124
2125        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2126            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2127        {
2128            let rows = vec![vec![
2129                Value::from(graph_name),
2130                Value::from(type_name),
2131                Value::from(def.open),
2132                Value::from(def.allowed_node_types.join(", ")),
2133                Value::from(def.allowed_edge_types.join(", ")),
2134            ]];
2135            return Ok(QueryResult {
2136                columns,
2137                column_types: Vec::new(),
2138                rows,
2139                ..QueryResult::empty()
2140            });
2141        }
2142
2143        // No graph type binding found
2144        Ok(QueryResult {
2145            columns,
2146            column_types: Vec::new(),
2147            rows: vec![vec![
2148                Value::from(graph_name),
2149                Value::Null,
2150                Value::Null,
2151                Value::Null,
2152                Value::Null,
2153            ]],
2154            ..QueryResult::empty()
2155        })
2156    }
2157
2158    /// Executes a GQL query.
2159    ///
2160    /// # Errors
2161    ///
2162    /// Returns an error if the query fails to parse or execute.
2163    ///
2164    /// # Examples
2165    ///
2166    /// ```no_run
2167    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2168    /// use grafeo_engine::GrafeoDB;
2169    ///
2170    /// let db = GrafeoDB::new_in_memory();
2171    /// let session = db.session();
2172    ///
2173    /// // Create a node
2174    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2175    ///
2176    /// // Query nodes
2177    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2178    /// for row in &result.rows {
2179    ///     println!("{:?}", row);
2180    /// }
2181    /// # Ok(())
2182    /// # }
2183    /// ```
2184    #[cfg(feature = "gql")]
2185    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2186        self.require_lpg("GQL")?;
2187
2188        use crate::query::{
2189            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2190            processor::QueryLanguage, translators::gql,
2191        };
2192
2193        let _span = grafeo_info_span!(
2194            "grafeo::session::execute",
2195            language = "gql",
2196            query_len = query.len(),
2197        );
2198
2199        #[cfg(not(target_arch = "wasm32"))]
2200        let start_time = std::time::Instant::now();
2201
2202        // Parse and translate, checking for session/schema commands first
2203        let translation = gql::translate_full(query)?;
2204        let logical_plan = match translation {
2205            gql::GqlTranslationResult::SessionCommand(cmd) => {
2206                return self.execute_session_command(cmd);
2207            }
2208            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2209                // All DDL is a write operation
2210                if *self.read_only_tx.lock() {
2211                    return Err(grafeo_common::utils::error::Error::Transaction(
2212                        grafeo_common::utils::error::TransactionError::ReadOnly,
2213                    ));
2214                }
2215                return self.execute_schema_command(cmd);
2216            }
2217            gql::GqlTranslationResult::Plan(plan) => {
2218                // Block mutations in read-only transactions
2219                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2220                    return Err(grafeo_common::utils::error::Error::Transaction(
2221                        grafeo_common::utils::error::TransactionError::ReadOnly,
2222                    ));
2223                }
2224                plan
2225            }
2226        };
2227
2228        // Create cache key for this query
2229        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2230
2231        // Try to get cached optimized plan, or use the plan we just translated
2232        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2233            cached_plan
2234        } else {
2235            // Semantic validation
2236            let mut binder = Binder::new();
2237            let _binding_context = binder.bind(&logical_plan)?;
2238
2239            // Optimize the plan
2240            let active = self.active_store();
2241            let optimizer = Optimizer::from_graph_store(&*active);
2242            let plan = optimizer.optimize(logical_plan)?;
2243
2244            // Cache the optimized plan for future use
2245            self.query_cache.put_optimized(cache_key, plan.clone());
2246
2247            plan
2248        };
2249
2250        // Resolve the active store for query execution
2251        let active = self.active_store();
2252
2253        // EXPLAIN: annotate pushdown hints and return the plan tree
2254        if optimized_plan.explain {
2255            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2256            let mut plan = optimized_plan;
2257            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2258            return Ok(explain_result(&plan));
2259        }
2260
2261        // PROFILE: execute with per-operator instrumentation
2262        if optimized_plan.profile {
2263            let has_mutations = optimized_plan.root.has_mutations();
2264            return self.with_auto_commit(has_mutations, || {
2265                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2266                let planner = self.create_planner_for_store(
2267                    Arc::clone(&active),
2268                    viewing_epoch,
2269                    transaction_id,
2270                );
2271                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2272
2273                let executor = Executor::with_columns(physical_plan.columns.clone())
2274                    .with_deadline(self.query_deadline());
2275                let _result = executor.execute(physical_plan.operator.as_mut())?;
2276
2277                let total_time_ms;
2278                #[cfg(not(target_arch = "wasm32"))]
2279                {
2280                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2281                }
2282                #[cfg(target_arch = "wasm32")]
2283                {
2284                    total_time_ms = 0.0;
2285                }
2286
2287                let profile_tree = crate::query::profile::build_profile_tree(
2288                    &optimized_plan.root,
2289                    &mut entries.into_iter(),
2290                );
2291                Ok(crate::query::profile::profile_result(
2292                    &profile_tree,
2293                    total_time_ms,
2294                ))
2295            });
2296        }
2297
2298        let has_mutations = optimized_plan.root.has_mutations();
2299
2300        let result = self.with_auto_commit(has_mutations, || {
2301            // Get transaction context for MVCC visibility
2302            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2303
2304            // Convert to physical plan with transaction context
2305            // (Physical planning cannot be cached as it depends on transaction state)
2306            // Safe to use read-only fast path when: this query has no mutations AND
2307            // there is no active transaction that may have prior uncommitted writes.
2308            let has_active_tx = self.current_transaction.lock().is_some();
2309            let read_only = !has_mutations && !has_active_tx;
2310            let planner = self.create_planner_for_store_with_read_only(
2311                Arc::clone(&active),
2312                viewing_epoch,
2313                transaction_id,
2314                read_only,
2315            );
2316            let mut physical_plan = planner.plan(&optimized_plan)?;
2317
2318            // Execute the plan
2319            let executor = Executor::with_columns(physical_plan.columns.clone())
2320                .with_deadline(self.query_deadline());
2321            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2322
2323            // Add execution metrics
2324            let rows_scanned = result.rows.len() as u64;
2325            #[cfg(not(target_arch = "wasm32"))]
2326            {
2327                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2328                result.execution_time_ms = Some(elapsed_ms);
2329            }
2330            result.rows_scanned = Some(rows_scanned);
2331
2332            Ok(result)
2333        });
2334
2335        // Record metrics for this query execution.
2336        #[cfg(feature = "metrics")]
2337        {
2338            #[cfg(not(target_arch = "wasm32"))]
2339            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2340            #[cfg(target_arch = "wasm32")]
2341            let elapsed_ms = None;
2342            self.record_query_metrics("gql", elapsed_ms, &result);
2343        }
2344
2345        result
2346    }
2347
2348    /// Executes a GQL query with visibility at the specified epoch.
2349    ///
2350    /// This enables time-travel queries: the query sees the database
2351    /// as it existed at the given epoch.
2352    ///
2353    /// # Errors
2354    ///
2355    /// Returns an error if parsing or execution fails.
2356    #[cfg(feature = "gql")]
2357    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2358        let previous = self.viewing_epoch_override.lock().replace(epoch);
2359        let result = self.execute(query);
2360        *self.viewing_epoch_override.lock() = previous;
2361        result
2362    }
2363
2364    /// Executes a GQL query at a specific epoch with optional parameters.
2365    ///
2366    /// Combines epoch-based time travel with parameterized queries.
2367    ///
2368    /// # Errors
2369    ///
2370    /// Returns an error if parsing or execution fails.
2371    #[cfg(feature = "gql")]
2372    pub fn execute_at_epoch_with_params(
2373        &self,
2374        query: &str,
2375        epoch: EpochId,
2376        params: Option<std::collections::HashMap<String, Value>>,
2377    ) -> Result<QueryResult> {
2378        let previous = self.viewing_epoch_override.lock().replace(epoch);
2379        let result = if let Some(p) = params {
2380            self.execute_with_params(query, p)
2381        } else {
2382            self.execute(query)
2383        };
2384        *self.viewing_epoch_override.lock() = previous;
2385        result
2386    }
2387
2388    /// Executes a GQL query with parameters.
2389    ///
2390    /// # Errors
2391    ///
2392    /// Returns an error if the query fails to parse or execute.
2393    #[cfg(feature = "gql")]
2394    pub fn execute_with_params(
2395        &self,
2396        query: &str,
2397        params: std::collections::HashMap<String, Value>,
2398    ) -> Result<QueryResult> {
2399        self.require_lpg("GQL")?;
2400
2401        use crate::query::processor::{QueryLanguage, QueryProcessor};
2402
2403        let has_mutations = Self::query_looks_like_mutation(query);
2404        let active = self.active_store();
2405
2406        self.with_auto_commit(has_mutations, || {
2407            // Get transaction context for MVCC visibility
2408            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2409
2410            // Create processor with transaction context
2411            let processor = QueryProcessor::for_graph_store_with_transaction(
2412                Arc::clone(&active),
2413                Arc::clone(&self.transaction_manager),
2414            )?;
2415
2416            // Apply transaction context if in a transaction
2417            let processor = if let Some(transaction_id) = transaction_id {
2418                processor.with_transaction_context(viewing_epoch, transaction_id)
2419            } else {
2420                processor
2421            };
2422
2423            processor.process(query, QueryLanguage::Gql, Some(&params))
2424        })
2425    }
2426
2427    /// Executes a GQL query with parameters.
2428    ///
2429    /// # Errors
2430    ///
2431    /// Returns an error if no query language is enabled.
2432    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2433    pub fn execute_with_params(
2434        &self,
2435        _query: &str,
2436        _params: std::collections::HashMap<String, Value>,
2437    ) -> Result<QueryResult> {
2438        Err(grafeo_common::utils::error::Error::Internal(
2439            "No query language enabled".to_string(),
2440        ))
2441    }
2442
2443    /// Executes a GQL query.
2444    ///
2445    /// # Errors
2446    ///
2447    /// Returns an error if no query language is enabled.
2448    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2449    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2450        Err(grafeo_common::utils::error::Error::Internal(
2451            "No query language enabled".to_string(),
2452        ))
2453    }
2454
2455    /// Executes a Cypher query.
2456    ///
2457    /// # Errors
2458    ///
2459    /// Returns an error if the query fails to parse or execute.
2460    #[cfg(feature = "cypher")]
2461    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2462        use crate::query::{
2463            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2464            processor::QueryLanguage, translators::cypher,
2465        };
2466        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2467
2468        // Handle schema DDL and SHOW commands before the normal query path
2469        let translation = cypher::translate_full(query)?;
2470        match translation {
2471            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2472                if *self.read_only_tx.lock() {
2473                    return Err(GrafeoError::Query(QueryError::new(
2474                        QueryErrorKind::Semantic,
2475                        "Cannot execute schema DDL in a read-only transaction",
2476                    )));
2477                }
2478                return self.execute_schema_command(cmd);
2479            }
2480            cypher::CypherTranslationResult::ShowIndexes => {
2481                return self.execute_show_indexes();
2482            }
2483            cypher::CypherTranslationResult::ShowConstraints => {
2484                return self.execute_show_constraints();
2485            }
2486            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2487                return self.execute_show_current_graph_type();
2488            }
2489            cypher::CypherTranslationResult::Plan(_) => {
2490                // Fall through to normal execution below
2491            }
2492        }
2493
2494        #[cfg(not(target_arch = "wasm32"))]
2495        let start_time = std::time::Instant::now();
2496
2497        // Create cache key for this query
2498        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2499
2500        // Try to get cached optimized plan
2501        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2502            cached_plan
2503        } else {
2504            // Parse and translate the query to a logical plan
2505            let logical_plan = cypher::translate(query)?;
2506
2507            // Semantic validation
2508            let mut binder = Binder::new();
2509            let _binding_context = binder.bind(&logical_plan)?;
2510
2511            // Optimize the plan
2512            let active = self.active_store();
2513            let optimizer = Optimizer::from_graph_store(&*active);
2514            let plan = optimizer.optimize(logical_plan)?;
2515
2516            // Cache the optimized plan
2517            self.query_cache.put_optimized(cache_key, plan.clone());
2518
2519            plan
2520        };
2521
2522        // Resolve the active store for query execution
2523        let active = self.active_store();
2524
2525        // EXPLAIN
2526        if optimized_plan.explain {
2527            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2528            let mut plan = optimized_plan;
2529            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2530            return Ok(explain_result(&plan));
2531        }
2532
2533        // PROFILE
2534        if optimized_plan.profile {
2535            let has_mutations = optimized_plan.root.has_mutations();
2536            return self.with_auto_commit(has_mutations, || {
2537                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2538                let planner = self.create_planner_for_store(
2539                    Arc::clone(&active),
2540                    viewing_epoch,
2541                    transaction_id,
2542                );
2543                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2544
2545                let executor = Executor::with_columns(physical_plan.columns.clone())
2546                    .with_deadline(self.query_deadline());
2547                let _result = executor.execute(physical_plan.operator.as_mut())?;
2548
2549                let total_time_ms;
2550                #[cfg(not(target_arch = "wasm32"))]
2551                {
2552                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2553                }
2554                #[cfg(target_arch = "wasm32")]
2555                {
2556                    total_time_ms = 0.0;
2557                }
2558
2559                let profile_tree = crate::query::profile::build_profile_tree(
2560                    &optimized_plan.root,
2561                    &mut entries.into_iter(),
2562                );
2563                Ok(crate::query::profile::profile_result(
2564                    &profile_tree,
2565                    total_time_ms,
2566                ))
2567            });
2568        }
2569
2570        let has_mutations = optimized_plan.root.has_mutations();
2571
2572        let result = self.with_auto_commit(has_mutations, || {
2573            // Get transaction context for MVCC visibility
2574            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2575
2576            // Convert to physical plan with transaction context
2577            let planner =
2578                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2579            let mut physical_plan = planner.plan(&optimized_plan)?;
2580
2581            // Execute the plan
2582            let executor = Executor::with_columns(physical_plan.columns.clone())
2583                .with_deadline(self.query_deadline());
2584            executor.execute(physical_plan.operator.as_mut())
2585        });
2586
2587        #[cfg(feature = "metrics")]
2588        {
2589            #[cfg(not(target_arch = "wasm32"))]
2590            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2591            #[cfg(target_arch = "wasm32")]
2592            let elapsed_ms = None;
2593            self.record_query_metrics("cypher", elapsed_ms, &result);
2594        }
2595
2596        result
2597    }
2598
2599    /// Executes a Gremlin query.
2600    ///
2601    /// # Errors
2602    ///
2603    /// Returns an error if the query fails to parse or execute.
2604    ///
2605    /// # Examples
2606    ///
2607    /// ```no_run
2608    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2609    /// use grafeo_engine::GrafeoDB;
2610    ///
2611    /// let db = GrafeoDB::new_in_memory();
2612    /// let session = db.session();
2613    ///
2614    /// // Create some nodes first
2615    /// session.create_node(&["Person"]);
2616    ///
2617    /// // Query using Gremlin
2618    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2619    /// # Ok(())
2620    /// # }
2621    /// ```
2622    #[cfg(feature = "gremlin")]
2623    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2624        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2625
2626        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2627        let start_time = Instant::now();
2628
2629        // Parse and translate the query to a logical plan
2630        let logical_plan = gremlin::translate(query)?;
2631
2632        // Semantic validation
2633        let mut binder = Binder::new();
2634        let _binding_context = binder.bind(&logical_plan)?;
2635
2636        // Optimize the plan
2637        let active = self.active_store();
2638        let optimizer = Optimizer::from_graph_store(&*active);
2639        let optimized_plan = optimizer.optimize(logical_plan)?;
2640
2641        let has_mutations = optimized_plan.root.has_mutations();
2642
2643        let result = self.with_auto_commit(has_mutations, || {
2644            // Get transaction context for MVCC visibility
2645            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2646
2647            // Convert to physical plan with transaction context
2648            let planner =
2649                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2650            let mut physical_plan = planner.plan(&optimized_plan)?;
2651
2652            // Execute the plan
2653            let executor = Executor::with_columns(physical_plan.columns.clone())
2654                .with_deadline(self.query_deadline());
2655            executor.execute(physical_plan.operator.as_mut())
2656        });
2657
2658        #[cfg(feature = "metrics")]
2659        {
2660            #[cfg(not(target_arch = "wasm32"))]
2661            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2662            #[cfg(target_arch = "wasm32")]
2663            let elapsed_ms = None;
2664            self.record_query_metrics("gremlin", elapsed_ms, &result);
2665        }
2666
2667        result
2668    }
2669
2670    /// Executes a Gremlin query with parameters.
2671    ///
2672    /// # Errors
2673    ///
2674    /// Returns an error if the query fails to parse or execute.
2675    #[cfg(feature = "gremlin")]
2676    pub fn execute_gremlin_with_params(
2677        &self,
2678        query: &str,
2679        params: std::collections::HashMap<String, Value>,
2680    ) -> Result<QueryResult> {
2681        use crate::query::processor::{QueryLanguage, QueryProcessor};
2682
2683        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2684        let start_time = Instant::now();
2685
2686        let has_mutations = Self::query_looks_like_mutation(query);
2687        let active = self.active_store();
2688
2689        let result = self.with_auto_commit(has_mutations, || {
2690            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2691            let processor = QueryProcessor::for_graph_store_with_transaction(
2692                Arc::clone(&active),
2693                Arc::clone(&self.transaction_manager),
2694            )?;
2695            let processor = if let Some(transaction_id) = transaction_id {
2696                processor.with_transaction_context(viewing_epoch, transaction_id)
2697            } else {
2698                processor
2699            };
2700            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2701        });
2702
2703        #[cfg(feature = "metrics")]
2704        {
2705            #[cfg(not(target_arch = "wasm32"))]
2706            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2707            #[cfg(target_arch = "wasm32")]
2708            let elapsed_ms = None;
2709            self.record_query_metrics("gremlin", elapsed_ms, &result);
2710        }
2711
2712        result
2713    }
2714
2715    /// Executes a GraphQL query against the LPG store.
2716    ///
2717    /// # Errors
2718    ///
2719    /// Returns an error if the query fails to parse or execute.
2720    ///
2721    /// # Examples
2722    ///
2723    /// ```no_run
2724    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2725    /// use grafeo_engine::GrafeoDB;
2726    ///
2727    /// let db = GrafeoDB::new_in_memory();
2728    /// let session = db.session();
2729    ///
2730    /// // Create some nodes first
2731    /// session.create_node(&["User"]);
2732    ///
2733    /// // Query using GraphQL
2734    /// let result = session.execute_graphql("query { user { id name } }")?;
2735    /// # Ok(())
2736    /// # }
2737    /// ```
2738    #[cfg(feature = "graphql")]
2739    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2740        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2741
2742        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2743        let start_time = Instant::now();
2744
2745        let logical_plan = graphql::translate(query)?;
2746        let mut binder = Binder::new();
2747        let _binding_context = binder.bind(&logical_plan)?;
2748
2749        let active = self.active_store();
2750        let optimizer = Optimizer::from_graph_store(&*active);
2751        let optimized_plan = optimizer.optimize(logical_plan)?;
2752        let has_mutations = optimized_plan.root.has_mutations();
2753
2754        let result = self.with_auto_commit(has_mutations, || {
2755            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2756            let planner =
2757                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2758            let mut physical_plan = planner.plan(&optimized_plan)?;
2759            let executor = Executor::with_columns(physical_plan.columns.clone())
2760                .with_deadline(self.query_deadline());
2761            executor.execute(physical_plan.operator.as_mut())
2762        });
2763
2764        #[cfg(feature = "metrics")]
2765        {
2766            #[cfg(not(target_arch = "wasm32"))]
2767            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2768            #[cfg(target_arch = "wasm32")]
2769            let elapsed_ms = None;
2770            self.record_query_metrics("graphql", elapsed_ms, &result);
2771        }
2772
2773        result
2774    }
2775
2776    /// Executes a GraphQL query with parameters.
2777    ///
2778    /// # Errors
2779    ///
2780    /// Returns an error if the query fails to parse or execute.
2781    #[cfg(feature = "graphql")]
2782    pub fn execute_graphql_with_params(
2783        &self,
2784        query: &str,
2785        params: std::collections::HashMap<String, Value>,
2786    ) -> Result<QueryResult> {
2787        use crate::query::processor::{QueryLanguage, QueryProcessor};
2788
2789        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2790        let start_time = Instant::now();
2791
2792        let has_mutations = Self::query_looks_like_mutation(query);
2793        let active = self.active_store();
2794
2795        let result = self.with_auto_commit(has_mutations, || {
2796            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2797            let processor = QueryProcessor::for_graph_store_with_transaction(
2798                Arc::clone(&active),
2799                Arc::clone(&self.transaction_manager),
2800            )?;
2801            let processor = if let Some(transaction_id) = transaction_id {
2802                processor.with_transaction_context(viewing_epoch, transaction_id)
2803            } else {
2804                processor
2805            };
2806            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2807        });
2808
2809        #[cfg(feature = "metrics")]
2810        {
2811            #[cfg(not(target_arch = "wasm32"))]
2812            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2813            #[cfg(target_arch = "wasm32")]
2814            let elapsed_ms = None;
2815            self.record_query_metrics("graphql", elapsed_ms, &result);
2816        }
2817
2818        result
2819    }
2820
2821    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2822    ///
2823    /// # Errors
2824    ///
2825    /// Returns an error if the query fails to parse or execute.
2826    ///
2827    /// # Examples
2828    ///
2829    /// ```no_run
2830    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2831    /// use grafeo_engine::GrafeoDB;
2832    ///
2833    /// let db = GrafeoDB::new_in_memory();
2834    /// let session = db.session();
2835    ///
2836    /// let result = session.execute_sql(
2837    ///     "SELECT * FROM GRAPH_TABLE (
2838    ///         MATCH (n:Person)
2839    ///         COLUMNS (n.name AS name)
2840    ///     )"
2841    /// )?;
2842    /// # Ok(())
2843    /// # }
2844    /// ```
2845    #[cfg(feature = "sql-pgq")]
2846    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2847        use crate::query::{
2848            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2849            processor::QueryLanguage, translators::sql_pgq,
2850        };
2851
2852        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2853        let start_time = Instant::now();
2854
2855        // Parse and translate (always needed to check for DDL)
2856        let logical_plan = sql_pgq::translate(query)?;
2857
2858        // Handle DDL statements directly (they don't go through the query pipeline)
2859        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2860            return Ok(QueryResult {
2861                columns: vec!["status".into()],
2862                column_types: vec![grafeo_common::types::LogicalType::String],
2863                rows: vec![vec![Value::from(format!(
2864                    "Property graph '{}' created ({} node tables, {} edge tables)",
2865                    cpg.name,
2866                    cpg.node_tables.len(),
2867                    cpg.edge_tables.len()
2868                ))]],
2869                execution_time_ms: None,
2870                rows_scanned: None,
2871                status_message: None,
2872                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2873            });
2874        }
2875
2876        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2877
2878        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2879            cached_plan
2880        } else {
2881            let mut binder = Binder::new();
2882            let _binding_context = binder.bind(&logical_plan)?;
2883            let active = self.active_store();
2884            let optimizer = Optimizer::from_graph_store(&*active);
2885            let plan = optimizer.optimize(logical_plan)?;
2886            self.query_cache.put_optimized(cache_key, plan.clone());
2887            plan
2888        };
2889
2890        let active = self.active_store();
2891        let has_mutations = optimized_plan.root.has_mutations();
2892
2893        let result = self.with_auto_commit(has_mutations, || {
2894            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2895            let planner =
2896                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2897            let mut physical_plan = planner.plan(&optimized_plan)?;
2898            let executor = Executor::with_columns(physical_plan.columns.clone())
2899                .with_deadline(self.query_deadline());
2900            executor.execute(physical_plan.operator.as_mut())
2901        });
2902
2903        #[cfg(feature = "metrics")]
2904        {
2905            #[cfg(not(target_arch = "wasm32"))]
2906            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2907            #[cfg(target_arch = "wasm32")]
2908            let elapsed_ms = None;
2909            self.record_query_metrics("sql", elapsed_ms, &result);
2910        }
2911
2912        result
2913    }
2914
2915    /// Executes a SQL/PGQ query with parameters.
2916    ///
2917    /// # Errors
2918    ///
2919    /// Returns an error if the query fails to parse or execute.
2920    #[cfg(feature = "sql-pgq")]
2921    pub fn execute_sql_with_params(
2922        &self,
2923        query: &str,
2924        params: std::collections::HashMap<String, Value>,
2925    ) -> Result<QueryResult> {
2926        use crate::query::processor::{QueryLanguage, QueryProcessor};
2927
2928        #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2929        let start_time = Instant::now();
2930
2931        let has_mutations = Self::query_looks_like_mutation(query);
2932        let active = self.active_store();
2933
2934        let result = self.with_auto_commit(has_mutations, || {
2935            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2936            let processor = QueryProcessor::for_graph_store_with_transaction(
2937                Arc::clone(&active),
2938                Arc::clone(&self.transaction_manager),
2939            )?;
2940            let processor = if let Some(transaction_id) = transaction_id {
2941                processor.with_transaction_context(viewing_epoch, transaction_id)
2942            } else {
2943                processor
2944            };
2945            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2946        });
2947
2948        #[cfg(feature = "metrics")]
2949        {
2950            #[cfg(not(target_arch = "wasm32"))]
2951            let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
2952            #[cfg(target_arch = "wasm32")]
2953            let elapsed_ms = None;
2954            self.record_query_metrics("sql", elapsed_ms, &result);
2955        }
2956
2957        result
2958    }
2959
2960    /// Executes a query in the specified language by name.
2961    ///
2962    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2963    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2964    ///
2965    /// # Errors
2966    ///
2967    /// Returns an error if the language is unknown/disabled or the query fails.
2968    pub fn execute_language(
2969        &self,
2970        query: &str,
2971        language: &str,
2972        params: Option<std::collections::HashMap<String, Value>>,
2973    ) -> Result<QueryResult> {
2974        let _span = grafeo_info_span!(
2975            "grafeo::session::execute",
2976            language,
2977            query_len = query.len(),
2978        );
2979        match language {
2980            "gql" => {
2981                if let Some(p) = params {
2982                    self.execute_with_params(query, p)
2983                } else {
2984                    self.execute(query)
2985                }
2986            }
2987            #[cfg(feature = "cypher")]
2988            "cypher" => {
2989                if let Some(p) = params {
2990                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2991
2992                    #[cfg(all(feature = "metrics", not(target_arch = "wasm32")))]
2993                    let start_time = Instant::now();
2994
2995                    let has_mutations = Self::query_looks_like_mutation(query);
2996                    let active = self.active_store();
2997                    let result = self.with_auto_commit(has_mutations, || {
2998                        let processor = QueryProcessor::for_graph_store_with_transaction(
2999                            Arc::clone(&active),
3000                            Arc::clone(&self.transaction_manager),
3001                        )?;
3002                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
3003                        let processor = if let Some(transaction_id) = transaction_id {
3004                            processor.with_transaction_context(viewing_epoch, transaction_id)
3005                        } else {
3006                            processor
3007                        };
3008                        processor.process(query, QueryLanguage::Cypher, Some(&p))
3009                    });
3010
3011                    #[cfg(feature = "metrics")]
3012                    {
3013                        #[cfg(not(target_arch = "wasm32"))]
3014                        let elapsed_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
3015                        #[cfg(target_arch = "wasm32")]
3016                        let elapsed_ms = None;
3017                        self.record_query_metrics("cypher", elapsed_ms, &result);
3018                    }
3019
3020                    result
3021                } else {
3022                    self.execute_cypher(query)
3023                }
3024            }
3025            #[cfg(feature = "gremlin")]
3026            "gremlin" => {
3027                if let Some(p) = params {
3028                    self.execute_gremlin_with_params(query, p)
3029                } else {
3030                    self.execute_gremlin(query)
3031                }
3032            }
3033            #[cfg(feature = "graphql")]
3034            "graphql" => {
3035                if let Some(p) = params {
3036                    self.execute_graphql_with_params(query, p)
3037                } else {
3038                    self.execute_graphql(query)
3039                }
3040            }
3041            #[cfg(all(feature = "graphql", feature = "rdf"))]
3042            "graphql-rdf" => {
3043                if let Some(p) = params {
3044                    self.execute_graphql_rdf_with_params(query, p)
3045                } else {
3046                    self.execute_graphql_rdf(query)
3047                }
3048            }
3049            #[cfg(feature = "sql-pgq")]
3050            "sql" | "sql-pgq" => {
3051                if let Some(p) = params {
3052                    self.execute_sql_with_params(query, p)
3053                } else {
3054                    self.execute_sql(query)
3055                }
3056            }
3057            #[cfg(all(feature = "sparql", feature = "rdf"))]
3058            "sparql" => {
3059                if let Some(p) = params {
3060                    self.execute_sparql_with_params(query, p)
3061                } else {
3062                    self.execute_sparql(query)
3063                }
3064            }
3065            other => Err(grafeo_common::utils::error::Error::Query(
3066                grafeo_common::utils::error::QueryError::new(
3067                    grafeo_common::utils::error::QueryErrorKind::Semantic,
3068                    format!("Unknown query language: '{other}'"),
3069                ),
3070            )),
3071        }
3072    }
3073
3074    /// Begins a new transaction.
3075    ///
3076    /// # Errors
3077    ///
3078    /// Returns an error if a transaction is already active.
3079    ///
3080    /// # Examples
3081    ///
3082    /// ```no_run
3083    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3084    /// use grafeo_engine::GrafeoDB;
3085    ///
3086    /// let db = GrafeoDB::new_in_memory();
3087    /// let mut session = db.session();
3088    ///
3089    /// session.begin_transaction()?;
3090    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3091    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
3092    /// session.commit()?; // Both inserts committed atomically
3093    /// # Ok(())
3094    /// # }
3095    /// ```
3096    /// Clears all cached query plans.
3097    ///
3098    /// The plan cache is shared across all sessions on the same database,
3099    /// so clearing from one session affects all sessions.
3100    pub fn clear_plan_cache(&self) {
3101        self.query_cache.clear();
3102    }
3103
3104    /// Begins a new transaction on this session.
3105    ///
3106    /// Uses the default isolation level (`SnapshotIsolation`).
3107    ///
3108    /// # Errors
3109    ///
3110    /// Returns an error if a transaction is already active.
3111    pub fn begin_transaction(&mut self) -> Result<()> {
3112        self.begin_transaction_inner(false, None)
3113    }
3114
3115    /// Begins a transaction with a specific isolation level.
3116    ///
3117    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3118    ///
3119    /// # Errors
3120    ///
3121    /// Returns an error if a transaction is already active.
3122    pub fn begin_transaction_with_isolation(
3123        &mut self,
3124        isolation_level: crate::transaction::IsolationLevel,
3125    ) -> Result<()> {
3126        self.begin_transaction_inner(false, Some(isolation_level))
3127    }
3128
3129    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3130    fn begin_transaction_inner(
3131        &self,
3132        read_only: bool,
3133        isolation_level: Option<crate::transaction::IsolationLevel>,
3134    ) -> Result<()> {
3135        let _span = grafeo_debug_span!("grafeo::tx::begin", read_only);
3136        let mut current = self.current_transaction.lock();
3137        if current.is_some() {
3138            // Nested transaction: create an auto-savepoint instead of a new tx.
3139            drop(current);
3140            let mut depth = self.transaction_nesting_depth.lock();
3141            *depth += 1;
3142            let sp_name = format!("_nested_tx_{}", *depth);
3143            self.savepoint(&sp_name)?;
3144            return Ok(());
3145        }
3146
3147        let active = self.active_lpg_store();
3148        self.transaction_start_node_count
3149            .store(active.node_count(), Ordering::Relaxed);
3150        self.transaction_start_edge_count
3151            .store(active.edge_count(), Ordering::Relaxed);
3152        let transaction_id = if let Some(level) = isolation_level {
3153            self.transaction_manager.begin_with_isolation(level)
3154        } else {
3155            self.transaction_manager.begin()
3156        };
3157        *current = Some(transaction_id);
3158        *self.read_only_tx.lock() = read_only || self.db_read_only;
3159
3160        // Record the initial graph as "touched" for cross-graph atomicity.
3161        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3162        let key = self.active_graph_storage_key();
3163        let mut touched = self.touched_graphs.lock();
3164        touched.clear();
3165        touched.push(key);
3166
3167        #[cfg(feature = "metrics")]
3168        {
3169            crate::metrics::record_metric!(self.metrics, tx_active, inc);
3170            #[cfg(not(target_arch = "wasm32"))]
3171            {
3172                *self.tx_start_time.lock() = Some(Instant::now());
3173            }
3174        }
3175
3176        Ok(())
3177    }
3178
3179    /// Commits the current transaction.
3180    ///
3181    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3182    ///
3183    /// # Errors
3184    ///
3185    /// Returns an error if no transaction is active.
3186    pub fn commit(&mut self) -> Result<()> {
3187        self.commit_inner()
3188    }
3189
3190    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3191    fn commit_inner(&self) -> Result<()> {
3192        let _span = grafeo_debug_span!("grafeo::tx::commit");
3193        // Nested transaction: release the auto-savepoint (changes are preserved).
3194        {
3195            let mut depth = self.transaction_nesting_depth.lock();
3196            if *depth > 0 {
3197                let sp_name = format!("_nested_tx_{depth}");
3198                *depth -= 1;
3199                drop(depth);
3200                return self.release_savepoint(&sp_name);
3201            }
3202        }
3203
3204        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3205            grafeo_common::utils::error::Error::Transaction(
3206                grafeo_common::utils::error::TransactionError::InvalidState(
3207                    "No active transaction".to_string(),
3208                ),
3209            )
3210        })?;
3211
3212        // Validate the transaction first (conflict detection) before committing data.
3213        // If this fails, we rollback the data changes instead of making them permanent.
3214        let touched = self.touched_graphs.lock().clone();
3215        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3216            Ok(epoch) => epoch,
3217            Err(e) => {
3218                // Conflict detected: rollback the data changes
3219                for graph_name in &touched {
3220                    let store = self.resolve_store(graph_name);
3221                    store.rollback_transaction_properties(transaction_id);
3222                }
3223                #[cfg(feature = "rdf")]
3224                self.rollback_rdf_transaction(transaction_id);
3225                *self.read_only_tx.lock() = self.db_read_only;
3226                self.savepoints.lock().clear();
3227                self.touched_graphs.lock().clear();
3228                #[cfg(feature = "metrics")]
3229                {
3230                    crate::metrics::record_metric!(self.metrics, tx_active, dec);
3231                    crate::metrics::record_metric!(self.metrics, tx_conflicts, inc);
3232                    #[cfg(not(target_arch = "wasm32"))]
3233                    if let Some(start) = self.tx_start_time.lock().take() {
3234                        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3235                        crate::metrics::record_metric!(
3236                            self.metrics,
3237                            tx_duration,
3238                            observe duration_ms
3239                        );
3240                    }
3241                }
3242                return Err(e);
3243            }
3244        };
3245
3246        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3247        for graph_name in &touched {
3248            let store = self.resolve_store(graph_name);
3249            store.finalize_version_epochs(transaction_id, commit_epoch);
3250        }
3251
3252        // Commit succeeded: discard undo logs (make changes permanent)
3253        #[cfg(feature = "rdf")]
3254        self.commit_rdf_transaction(transaction_id);
3255
3256        for graph_name in &touched {
3257            let store = self.resolve_store(graph_name);
3258            store.commit_transaction_properties(transaction_id);
3259        }
3260
3261        // Sync epoch for all touched graphs so that convenience lookups
3262        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3263        let current_epoch = self.transaction_manager.current_epoch();
3264        for graph_name in &touched {
3265            let store = self.resolve_store(graph_name);
3266            store.sync_epoch(current_epoch);
3267        }
3268
3269        // Reset read-only flag, clear savepoints and touched graphs
3270        *self.read_only_tx.lock() = self.db_read_only;
3271        self.savepoints.lock().clear();
3272        self.touched_graphs.lock().clear();
3273
3274        // Auto-GC: periodically prune old MVCC versions
3275        if self.gc_interval > 0 {
3276            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3277            if count.is_multiple_of(self.gc_interval) {
3278                let min_epoch = self.transaction_manager.min_active_epoch();
3279                for graph_name in &touched {
3280                    let store = self.resolve_store(graph_name);
3281                    store.gc_versions(min_epoch);
3282                }
3283                self.transaction_manager.gc();
3284                #[cfg(feature = "metrics")]
3285                crate::metrics::record_metric!(self.metrics, gc_runs, inc);
3286            }
3287        }
3288
3289        #[cfg(feature = "metrics")]
3290        {
3291            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3292            crate::metrics::record_metric!(self.metrics, tx_committed, inc);
3293            #[cfg(not(target_arch = "wasm32"))]
3294            if let Some(start) = self.tx_start_time.lock().take() {
3295                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3296                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3297            }
3298        }
3299
3300        Ok(())
3301    }
3302
3303    /// Aborts the current transaction.
3304    ///
3305    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3306    ///
3307    /// # Errors
3308    ///
3309    /// Returns an error if no transaction is active.
3310    ///
3311    /// # Examples
3312    ///
3313    /// ```no_run
3314    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3315    /// use grafeo_engine::GrafeoDB;
3316    ///
3317    /// let db = GrafeoDB::new_in_memory();
3318    /// let mut session = db.session();
3319    ///
3320    /// session.begin_transaction()?;
3321    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3322    /// session.rollback()?; // Insert is discarded
3323    /// # Ok(())
3324    /// # }
3325    /// ```
3326    pub fn rollback(&mut self) -> Result<()> {
3327        self.rollback_inner()
3328    }
3329
3330    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3331    fn rollback_inner(&self) -> Result<()> {
3332        let _span = grafeo_debug_span!("grafeo::tx::rollback");
3333        // Nested transaction: rollback to the auto-savepoint.
3334        {
3335            let mut depth = self.transaction_nesting_depth.lock();
3336            if *depth > 0 {
3337                let sp_name = format!("_nested_tx_{depth}");
3338                *depth -= 1;
3339                drop(depth);
3340                return self.rollback_to_savepoint(&sp_name);
3341            }
3342        }
3343
3344        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3345            grafeo_common::utils::error::Error::Transaction(
3346                grafeo_common::utils::error::TransactionError::InvalidState(
3347                    "No active transaction".to_string(),
3348                ),
3349            )
3350        })?;
3351
3352        // Reset read-only flag
3353        *self.read_only_tx.lock() = self.db_read_only;
3354
3355        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3356        let touched = self.touched_graphs.lock().clone();
3357        for graph_name in &touched {
3358            let store = self.resolve_store(graph_name);
3359            store.discard_uncommitted_versions(transaction_id);
3360        }
3361
3362        // Discard pending operations in the RDF store
3363        #[cfg(feature = "rdf")]
3364        self.rollback_rdf_transaction(transaction_id);
3365
3366        // Clear savepoints and touched graphs
3367        self.savepoints.lock().clear();
3368        self.touched_graphs.lock().clear();
3369
3370        // Mark transaction as aborted in the manager
3371        let result = self.transaction_manager.abort(transaction_id);
3372
3373        #[cfg(feature = "metrics")]
3374        if result.is_ok() {
3375            crate::metrics::record_metric!(self.metrics, tx_active, dec);
3376            crate::metrics::record_metric!(self.metrics, tx_rolled_back, inc);
3377            #[cfg(not(target_arch = "wasm32"))]
3378            if let Some(start) = self.tx_start_time.lock().take() {
3379                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
3380                crate::metrics::record_metric!(self.metrics, tx_duration, observe duration_ms);
3381            }
3382        }
3383
3384        result
3385    }
3386
3387    /// Creates a named savepoint within the current transaction.
3388    ///
3389    /// The savepoint captures the current node/edge ID counters so that
3390    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3391    /// entities created after this point.
3392    ///
3393    /// # Errors
3394    ///
3395    /// Returns an error if no transaction is active.
3396    pub fn savepoint(&self, name: &str) -> Result<()> {
3397        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3398            grafeo_common::utils::error::Error::Transaction(
3399                grafeo_common::utils::error::TransactionError::InvalidState(
3400                    "No active transaction".to_string(),
3401                ),
3402            )
3403        })?;
3404
3405        // Capture state for every graph touched so far.
3406        let touched = self.touched_graphs.lock().clone();
3407        let graph_snapshots: Vec<GraphSavepoint> = touched
3408            .iter()
3409            .map(|graph_name| {
3410                let store = self.resolve_store(graph_name);
3411                GraphSavepoint {
3412                    graph_name: graph_name.clone(),
3413                    next_node_id: store.peek_next_node_id(),
3414                    next_edge_id: store.peek_next_edge_id(),
3415                    undo_log_position: store.property_undo_log_position(tx_id),
3416                }
3417            })
3418            .collect();
3419
3420        self.savepoints.lock().push(SavepointState {
3421            name: name.to_string(),
3422            graph_snapshots,
3423            active_graph: self.current_graph.lock().clone(),
3424        });
3425        Ok(())
3426    }
3427
3428    /// Rolls back to a named savepoint, undoing all writes made after it.
3429    ///
3430    /// The savepoint and any savepoints created after it are removed.
3431    /// Entities with IDs >= the savepoint snapshot are discarded.
3432    ///
3433    /// # Errors
3434    ///
3435    /// Returns an error if no transaction is active or the savepoint does not exist.
3436    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3437        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3438            grafeo_common::utils::error::Error::Transaction(
3439                grafeo_common::utils::error::TransactionError::InvalidState(
3440                    "No active transaction".to_string(),
3441                ),
3442            )
3443        })?;
3444
3445        let mut savepoints = self.savepoints.lock();
3446
3447        // Find the savepoint by name (search from the end for nested savepoints)
3448        let pos = savepoints
3449            .iter()
3450            .rposition(|sp| sp.name == name)
3451            .ok_or_else(|| {
3452                grafeo_common::utils::error::Error::Transaction(
3453                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3454                        "Savepoint '{name}' not found"
3455                    )),
3456                )
3457            })?;
3458
3459        let sp_state = savepoints[pos].clone();
3460
3461        // Remove this savepoint and all later ones
3462        savepoints.truncate(pos);
3463        drop(savepoints);
3464
3465        // Roll back each graph that was captured in the savepoint.
3466        for gs in &sp_state.graph_snapshots {
3467            let store = self.resolve_store(&gs.graph_name);
3468
3469            // Replay property/label undo entries recorded after the savepoint
3470            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3471
3472            // Discard entities created after the savepoint
3473            let current_next_node = store.peek_next_node_id();
3474            let current_next_edge = store.peek_next_edge_id();
3475
3476            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3477                .map(NodeId::new)
3478                .collect();
3479            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3480                .map(EdgeId::new)
3481                .collect();
3482
3483            if !node_ids.is_empty() || !edge_ids.is_empty() {
3484                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3485            }
3486        }
3487
3488        // Also roll back any graphs that were touched AFTER the savepoint
3489        // but not captured in it. These need full discard since the savepoint
3490        // didn't include them.
3491        let touched = self.touched_graphs.lock().clone();
3492        for graph_name in &touched {
3493            let already_captured = sp_state
3494                .graph_snapshots
3495                .iter()
3496                .any(|gs| gs.graph_name == *graph_name);
3497            if !already_captured {
3498                let store = self.resolve_store(graph_name);
3499                store.discard_uncommitted_versions(transaction_id);
3500            }
3501        }
3502
3503        // Restore touched_graphs to only the graphs that were known at savepoint time.
3504        let mut touched = self.touched_graphs.lock();
3505        touched.clear();
3506        for gs in &sp_state.graph_snapshots {
3507            if !touched.contains(&gs.graph_name) {
3508                touched.push(gs.graph_name.clone());
3509            }
3510        }
3511
3512        Ok(())
3513    }
3514
3515    /// Releases (removes) a named savepoint without rolling back.
3516    ///
3517    /// # Errors
3518    ///
3519    /// Returns an error if no transaction is active or the savepoint does not exist.
3520    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3521        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3522            grafeo_common::utils::error::Error::Transaction(
3523                grafeo_common::utils::error::TransactionError::InvalidState(
3524                    "No active transaction".to_string(),
3525                ),
3526            )
3527        })?;
3528
3529        let mut savepoints = self.savepoints.lock();
3530        let pos = savepoints
3531            .iter()
3532            .rposition(|sp| sp.name == name)
3533            .ok_or_else(|| {
3534                grafeo_common::utils::error::Error::Transaction(
3535                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3536                        "Savepoint '{name}' not found"
3537                    )),
3538                )
3539            })?;
3540        savepoints.remove(pos);
3541        Ok(())
3542    }
3543
3544    /// Returns whether a transaction is active.
3545    #[must_use]
3546    pub fn in_transaction(&self) -> bool {
3547        self.current_transaction.lock().is_some()
3548    }
3549
3550    /// Returns the current transaction ID, if any.
3551    #[must_use]
3552    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3553        *self.current_transaction.lock()
3554    }
3555
3556    /// Returns a reference to the transaction manager.
3557    #[must_use]
3558    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3559        &self.transaction_manager
3560    }
3561
3562    /// Returns the store's current node count and the count at transaction start.
3563    #[must_use]
3564    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3565        (
3566            self.transaction_start_node_count.load(Ordering::Relaxed),
3567            self.active_lpg_store().node_count(),
3568        )
3569    }
3570
3571    /// Returns the store's current edge count and the count at transaction start.
3572    #[must_use]
3573    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3574        (
3575            self.transaction_start_edge_count.load(Ordering::Relaxed),
3576            self.active_lpg_store().edge_count(),
3577        )
3578    }
3579
3580    /// Prepares the current transaction for a two-phase commit.
3581    ///
3582    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3583    /// lets you inspect pending changes and attach metadata before finalizing.
3584    /// The mutable borrow prevents concurrent operations while the commit is
3585    /// pending.
3586    ///
3587    /// If the `PreparedCommit` is dropped without calling `commit()` or
3588    /// `abort()`, the transaction is automatically rolled back.
3589    ///
3590    /// # Errors
3591    ///
3592    /// Returns an error if no transaction is active.
3593    ///
3594    /// # Examples
3595    ///
3596    /// ```no_run
3597    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3598    /// use grafeo_engine::GrafeoDB;
3599    ///
3600    /// let db = GrafeoDB::new_in_memory();
3601    /// let mut session = db.session();
3602    ///
3603    /// session.begin_transaction()?;
3604    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3605    ///
3606    /// let mut prepared = session.prepare_commit()?;
3607    /// println!("Nodes written: {}", prepared.info().nodes_written);
3608    /// prepared.set_metadata("audit_user", "admin");
3609    /// prepared.commit()?;
3610    /// # Ok(())
3611    /// # }
3612    /// ```
3613    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3614        crate::transaction::PreparedCommit::new(self)
3615    }
3616
3617    /// Sets auto-commit mode.
3618    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3619        self.auto_commit = auto_commit;
3620    }
3621
3622    /// Returns whether auto-commit is enabled.
3623    #[must_use]
3624    pub fn auto_commit(&self) -> bool {
3625        self.auto_commit
3626    }
3627
3628    /// Returns `true` if auto-commit should wrap this execution.
3629    ///
3630    /// Auto-commit kicks in when: the session is in auto-commit mode,
3631    /// no explicit transaction is active, and the query mutates data.
3632    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3633        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3634    }
3635
3636    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3637    /// returns `true`. On error the transaction is rolled back.
3638    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3639    where
3640        F: FnOnce() -> Result<QueryResult>,
3641    {
3642        if self.needs_auto_commit(has_mutations) {
3643            self.begin_transaction_inner(false, None)?;
3644            match body() {
3645                Ok(result) => {
3646                    self.commit_inner()?;
3647                    Ok(result)
3648                }
3649                Err(e) => {
3650                    let _ = self.rollback_inner();
3651                    Err(e)
3652                }
3653            }
3654        } else {
3655            body()
3656        }
3657    }
3658
3659    /// Quick heuristic: returns `true` when the query text looks like it
3660    /// performs a mutation. Used by `_with_params` paths that go through the
3661    /// `QueryProcessor` (where the logical plan isn't available before
3662    /// execution). False negatives are harmless: the data just won't be
3663    /// auto-committed, which matches the prior behaviour.
3664    fn query_looks_like_mutation(query: &str) -> bool {
3665        let upper = query.to_ascii_uppercase();
3666        upper.contains("INSERT")
3667            || upper.contains("CREATE")
3668            || upper.contains("DELETE")
3669            || upper.contains("MERGE")
3670            || upper.contains("SET")
3671            || upper.contains("REMOVE")
3672            || upper.contains("DROP")
3673            || upper.contains("ALTER")
3674    }
3675
3676    /// Computes the wall-clock deadline for query execution.
3677    #[must_use]
3678    fn query_deadline(&self) -> Option<Instant> {
3679        #[cfg(not(target_arch = "wasm32"))]
3680        {
3681            self.query_timeout.map(|d| Instant::now() + d)
3682        }
3683        #[cfg(target_arch = "wasm32")]
3684        {
3685            let _ = &self.query_timeout;
3686            None
3687        }
3688    }
3689
3690    /// Records query metrics for any language.
3691    ///
3692    /// Called after query execution to update counters, latency histogram,
3693    /// and per-language tracking. `elapsed_ms` should be `None` on WASM
3694    /// where `Instant` is unavailable.
3695    #[cfg(feature = "metrics")]
3696    fn record_query_metrics(
3697        &self,
3698        language: &str,
3699        elapsed_ms: Option<f64>,
3700        result: &Result<crate::database::QueryResult>,
3701    ) {
3702        use crate::metrics::record_metric;
3703
3704        record_metric!(self.metrics, query_count, inc);
3705        if let Some(ref reg) = self.metrics {
3706            reg.query_count_by_language.increment(language);
3707        }
3708        if let Some(ms) = elapsed_ms {
3709            record_metric!(self.metrics, query_latency, observe ms);
3710        }
3711        match result {
3712            Ok(r) => {
3713                let returned = r.rows.len() as u64;
3714                record_metric!(self.metrics, rows_returned, add returned);
3715                if let Some(scanned) = r.rows_scanned {
3716                    record_metric!(self.metrics, rows_scanned, add scanned);
3717                }
3718            }
3719            Err(e) => {
3720                record_metric!(self.metrics, query_errors, inc);
3721                // Detect timeout errors
3722                let msg = e.to_string();
3723                if msg.contains("exceeded timeout") {
3724                    record_metric!(self.metrics, query_timeouts, inc);
3725                }
3726            }
3727        }
3728    }
3729
3730    /// Evaluates a simple integer literal from a session parameter expression.
3731    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3732        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3733        match expr {
3734            Expression::Literal(Literal::Integer(n)) => Some(*n),
3735            _ => None,
3736        }
3737    }
3738
3739    /// Returns the current transaction context for MVCC visibility.
3740    ///
3741    /// Returns `(viewing_epoch, transaction_id)` where:
3742    /// - `viewing_epoch` is the epoch at which to check version visibility
3743    /// - `transaction_id` is the current transaction ID (if in a transaction)
3744    #[must_use]
3745    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3746        // Time-travel override takes precedence (read-only, no tx context)
3747        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3748            return (epoch, None);
3749        }
3750
3751        if let Some(transaction_id) = *self.current_transaction.lock() {
3752            // In a transaction: use the transaction's start epoch
3753            let epoch = self
3754                .transaction_manager
3755                .start_epoch(transaction_id)
3756                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3757            (epoch, Some(transaction_id))
3758        } else {
3759            // No transaction: use current epoch
3760            (self.transaction_manager.current_epoch(), None)
3761        }
3762    }
3763
3764    /// Creates a planner with transaction context and constraint validator.
3765    ///
3766    /// The `store` parameter is the graph store to plan against (use
3767    /// `self.active_store()` for graph-aware execution).
3768    fn create_planner_for_store(
3769        &self,
3770        store: Arc<dyn GraphStoreMut>,
3771        viewing_epoch: EpochId,
3772        transaction_id: Option<TransactionId>,
3773    ) -> crate::query::Planner {
3774        self.create_planner_for_store_with_read_only(store, viewing_epoch, transaction_id, false)
3775    }
3776
3777    fn create_planner_for_store_with_read_only(
3778        &self,
3779        store: Arc<dyn GraphStoreMut>,
3780        viewing_epoch: EpochId,
3781        transaction_id: Option<TransactionId>,
3782        read_only: bool,
3783    ) -> crate::query::Planner {
3784        use crate::query::Planner;
3785        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3786
3787        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3788        let info_store = Arc::clone(&store);
3789        let schema_store = Arc::clone(&store);
3790
3791        let session_context = SessionContext {
3792            current_schema: self.current_schema(),
3793            current_graph: self.current_graph(),
3794            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3795            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3796        };
3797
3798        let mut planner = Planner::with_context(
3799            Arc::clone(&store),
3800            Arc::clone(&self.transaction_manager),
3801            transaction_id,
3802            viewing_epoch,
3803        )
3804        .with_factorized_execution(self.factorized_execution)
3805        .with_catalog(Arc::clone(&self.catalog))
3806        .with_session_context(session_context)
3807        .with_read_only(read_only);
3808
3809        // Attach the constraint validator for schema enforcement
3810        let validator =
3811            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3812        planner = planner.with_validator(Arc::new(validator));
3813
3814        planner
3815    }
3816
3817    /// Builds a `Value::Map` for the `info()` introspection function.
3818    fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3819        use grafeo_common::types::PropertyKey;
3820        use std::collections::BTreeMap;
3821
3822        let mut map = BTreeMap::new();
3823        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3824        map.insert(
3825            PropertyKey::from("node_count"),
3826            Value::Int64(store.node_count() as i64),
3827        );
3828        map.insert(
3829            PropertyKey::from("edge_count"),
3830            Value::Int64(store.edge_count() as i64),
3831        );
3832        map.insert(
3833            PropertyKey::from("version"),
3834            Value::String(env!("CARGO_PKG_VERSION").into()),
3835        );
3836        Value::Map(map.into())
3837    }
3838
3839    /// Builds a `Value::Map` for the `schema()` introspection function.
3840    fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3841        use grafeo_common::types::PropertyKey;
3842        use std::collections::BTreeMap;
3843
3844        let labels: Vec<Value> = store
3845            .all_labels()
3846            .into_iter()
3847            .map(|l| Value::String(l.into()))
3848            .collect();
3849        let edge_types: Vec<Value> = store
3850            .all_edge_types()
3851            .into_iter()
3852            .map(|t| Value::String(t.into()))
3853            .collect();
3854        let property_keys: Vec<Value> = store
3855            .all_property_keys()
3856            .into_iter()
3857            .map(|k| Value::String(k.into()))
3858            .collect();
3859
3860        let mut map = BTreeMap::new();
3861        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3862        map.insert(
3863            PropertyKey::from("edge_types"),
3864            Value::List(edge_types.into()),
3865        );
3866        map.insert(
3867            PropertyKey::from("property_keys"),
3868            Value::List(property_keys.into()),
3869        );
3870        Value::Map(map.into())
3871    }
3872
3873    /// Creates a node directly (bypassing query execution).
3874    ///
3875    /// This is a low-level API for testing and direct manipulation.
3876    /// If a transaction is active, the node will be versioned with the transaction ID.
3877    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3878        let (epoch, transaction_id) = self.get_transaction_context();
3879        self.active_lpg_store().create_node_versioned(
3880            labels,
3881            epoch,
3882            transaction_id.unwrap_or(TransactionId::SYSTEM),
3883        )
3884    }
3885
3886    /// Creates a node with properties.
3887    ///
3888    /// If a transaction is active, the node will be versioned with the transaction ID.
3889    pub fn create_node_with_props<'a>(
3890        &self,
3891        labels: &[&str],
3892        properties: impl IntoIterator<Item = (&'a str, Value)>,
3893    ) -> NodeId {
3894        let (epoch, transaction_id) = self.get_transaction_context();
3895        self.active_lpg_store().create_node_with_props_versioned(
3896            labels,
3897            properties,
3898            epoch,
3899            transaction_id.unwrap_or(TransactionId::SYSTEM),
3900        )
3901    }
3902
3903    /// Creates an edge between two nodes.
3904    ///
3905    /// This is a low-level API for testing and direct manipulation.
3906    /// If a transaction is active, the edge will be versioned with the transaction ID.
3907    pub fn create_edge(
3908        &self,
3909        src: NodeId,
3910        dst: NodeId,
3911        edge_type: &str,
3912    ) -> grafeo_common::types::EdgeId {
3913        let (epoch, transaction_id) = self.get_transaction_context();
3914        self.active_lpg_store().create_edge_versioned(
3915            src,
3916            dst,
3917            edge_type,
3918            epoch,
3919            transaction_id.unwrap_or(TransactionId::SYSTEM),
3920        )
3921    }
3922
3923    // =========================================================================
3924    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3925    // =========================================================================
3926
3927    /// Gets a node by ID directly, bypassing query planning.
3928    ///
3929    /// This is the fastest way to retrieve a single node when you know its ID.
3930    /// Skips parsing, binding, optimization, and physical planning entirely.
3931    ///
3932    /// # Performance
3933    ///
3934    /// - Time complexity: O(1) average case
3935    /// - No lock contention (uses DashMap internally)
3936    /// - ~20-30x faster than equivalent MATCH query
3937    ///
3938    /// # Example
3939    ///
3940    /// ```no_run
3941    /// # use grafeo_engine::GrafeoDB;
3942    /// # let db = GrafeoDB::new_in_memory();
3943    /// let session = db.session();
3944    /// let node_id = session.create_node(&["Person"]);
3945    ///
3946    /// // Direct lookup - O(1), no query planning
3947    /// let node = session.get_node(node_id);
3948    /// assert!(node.is_some());
3949    /// ```
3950    #[must_use]
3951    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3952        let (epoch, transaction_id) = self.get_transaction_context();
3953        self.active_lpg_store().get_node_versioned(
3954            id,
3955            epoch,
3956            transaction_id.unwrap_or(TransactionId::SYSTEM),
3957        )
3958    }
3959
3960    /// Gets a single property from a node by ID, bypassing query planning.
3961    ///
3962    /// More efficient than `get_node()` when you only need one property,
3963    /// as it avoids loading the full node with all properties.
3964    ///
3965    /// # Performance
3966    ///
3967    /// - Time complexity: O(1) average case
3968    /// - No query planning overhead
3969    ///
3970    /// # Example
3971    ///
3972    /// ```no_run
3973    /// # use grafeo_engine::GrafeoDB;
3974    /// # use grafeo_common::types::Value;
3975    /// # let db = GrafeoDB::new_in_memory();
3976    /// let session = db.session();
3977    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3978    ///
3979    /// // Direct property access - O(1)
3980    /// let name = session.get_node_property(id, "name");
3981    /// assert_eq!(name, Some(Value::String("Alix".into())));
3982    /// ```
3983    #[must_use]
3984    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3985        self.get_node(id)
3986            .and_then(|node| node.get_property(key).cloned())
3987    }
3988
3989    /// Gets an edge by ID directly, bypassing query planning.
3990    ///
3991    /// # Performance
3992    ///
3993    /// - Time complexity: O(1) average case
3994    /// - No lock contention
3995    #[must_use]
3996    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3997        let (epoch, transaction_id) = self.get_transaction_context();
3998        self.active_lpg_store().get_edge_versioned(
3999            id,
4000            epoch,
4001            transaction_id.unwrap_or(TransactionId::SYSTEM),
4002        )
4003    }
4004
4005    /// Gets outgoing neighbors of a node directly, bypassing query planning.
4006    ///
4007    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
4008    ///
4009    /// # Performance
4010    ///
4011    /// - Time complexity: O(degree) where degree is the number of outgoing edges
4012    /// - Uses adjacency index for direct access
4013    /// - ~10-20x faster than equivalent MATCH query
4014    ///
4015    /// # Example
4016    ///
4017    /// ```no_run
4018    /// # use grafeo_engine::GrafeoDB;
4019    /// # let db = GrafeoDB::new_in_memory();
4020    /// let session = db.session();
4021    /// let alix = session.create_node(&["Person"]);
4022    /// let gus = session.create_node(&["Person"]);
4023    /// session.create_edge(alix, gus, "KNOWS");
4024    ///
4025    /// // Direct neighbor lookup - O(degree)
4026    /// let neighbors = session.get_neighbors_outgoing(alix);
4027    /// assert_eq!(neighbors.len(), 1);
4028    /// assert_eq!(neighbors[0].0, gus);
4029    /// ```
4030    #[must_use]
4031    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4032        self.active_lpg_store()
4033            .edges_from(node, Direction::Outgoing)
4034            .collect()
4035    }
4036
4037    /// Gets incoming neighbors of a node directly, bypassing query planning.
4038    ///
4039    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
4040    ///
4041    /// # Performance
4042    ///
4043    /// - Time complexity: O(degree) where degree is the number of incoming edges
4044    /// - Uses backward adjacency index for direct access
4045    #[must_use]
4046    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
4047        self.active_lpg_store()
4048            .edges_from(node, Direction::Incoming)
4049            .collect()
4050    }
4051
4052    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
4053    ///
4054    /// # Example
4055    ///
4056    /// ```no_run
4057    /// # use grafeo_engine::GrafeoDB;
4058    /// # let db = GrafeoDB::new_in_memory();
4059    /// # let session = db.session();
4060    /// # let alix = session.create_node(&["Person"]);
4061    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4062    /// ```
4063    #[must_use]
4064    pub fn get_neighbors_outgoing_by_type(
4065        &self,
4066        node: NodeId,
4067        edge_type: &str,
4068    ) -> Vec<(NodeId, EdgeId)> {
4069        self.active_lpg_store()
4070            .edges_from(node, Direction::Outgoing)
4071            .filter(|(_, edge_id)| {
4072                self.get_edge(*edge_id)
4073                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
4074            })
4075            .collect()
4076    }
4077
4078    /// Checks if a node exists, bypassing query planning.
4079    ///
4080    /// # Performance
4081    ///
4082    /// - Time complexity: O(1)
4083    /// - Fastest existence check available
4084    #[must_use]
4085    pub fn node_exists(&self, id: NodeId) -> bool {
4086        self.get_node(id).is_some()
4087    }
4088
4089    /// Checks if an edge exists, bypassing query planning.
4090    #[must_use]
4091    pub fn edge_exists(&self, id: EdgeId) -> bool {
4092        self.get_edge(id).is_some()
4093    }
4094
4095    /// Gets the degree (number of edges) of a node.
4096    ///
4097    /// Returns (outgoing_degree, incoming_degree).
4098    #[must_use]
4099    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
4100        let active = self.active_lpg_store();
4101        let out = active.out_degree(node);
4102        let in_degree = active.in_degree(node);
4103        (out, in_degree)
4104    }
4105
4106    /// Batch lookup of multiple nodes by ID.
4107    ///
4108    /// More efficient than calling `get_node()` in a loop because it
4109    /// amortizes overhead.
4110    ///
4111    /// # Performance
4112    ///
4113    /// - Time complexity: O(n) where n is the number of IDs
4114    /// - Better cache utilization than individual lookups
4115    #[must_use]
4116    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
4117        let (epoch, transaction_id) = self.get_transaction_context();
4118        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
4119        let active = self.active_lpg_store();
4120        ids.iter()
4121            .map(|&id| active.get_node_versioned(id, epoch, tx))
4122            .collect()
4123    }
4124
4125    // ── Change Data Capture ─────────────────────────────────────────────
4126
4127    /// Returns the full change history for an entity (node or edge).
4128    #[cfg(feature = "cdc")]
4129    pub fn history(
4130        &self,
4131        entity_id: impl Into<crate::cdc::EntityId>,
4132    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4133        Ok(self.cdc_log.history(entity_id.into()))
4134    }
4135
4136    /// Returns change events for an entity since the given epoch.
4137    #[cfg(feature = "cdc")]
4138    pub fn history_since(
4139        &self,
4140        entity_id: impl Into<crate::cdc::EntityId>,
4141        since_epoch: EpochId,
4142    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4143        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
4144    }
4145
4146    /// Returns all change events across all entities in an epoch range.
4147    #[cfg(feature = "cdc")]
4148    pub fn changes_between(
4149        &self,
4150        start_epoch: EpochId,
4151        end_epoch: EpochId,
4152    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
4153        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
4154    }
4155}
4156
4157impl Drop for Session {
4158    fn drop(&mut self) {
4159        // Auto-rollback any active transaction to prevent leaked MVCC state,
4160        // dangling write locks, and uncommitted versions lingering in the store.
4161        if self.in_transaction() {
4162            let _ = self.rollback_inner();
4163        }
4164
4165        #[cfg(feature = "metrics")]
4166        if let Some(ref reg) = self.metrics {
4167            reg.session_active
4168                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
4169        }
4170    }
4171}
4172
4173#[cfg(test)]
4174mod tests {
4175    use super::parse_default_literal;
4176    use crate::database::GrafeoDB;
4177    use grafeo_common::types::Value;
4178
4179    // -----------------------------------------------------------------------
4180    // parse_default_literal
4181    // -----------------------------------------------------------------------
4182
4183    #[test]
4184    fn parse_default_literal_null() {
4185        assert_eq!(parse_default_literal("null"), Value::Null);
4186        assert_eq!(parse_default_literal("NULL"), Value::Null);
4187        assert_eq!(parse_default_literal("Null"), Value::Null);
4188    }
4189
4190    #[test]
4191    fn parse_default_literal_bool() {
4192        assert_eq!(parse_default_literal("true"), Value::Bool(true));
4193        assert_eq!(parse_default_literal("TRUE"), Value::Bool(true));
4194        assert_eq!(parse_default_literal("false"), Value::Bool(false));
4195        assert_eq!(parse_default_literal("FALSE"), Value::Bool(false));
4196    }
4197
4198    #[test]
4199    fn parse_default_literal_string_single_quoted() {
4200        assert_eq!(
4201            parse_default_literal("'hello'"),
4202            Value::String("hello".into())
4203        );
4204    }
4205
4206    #[test]
4207    fn parse_default_literal_string_double_quoted() {
4208        assert_eq!(
4209            parse_default_literal("\"world\""),
4210            Value::String("world".into())
4211        );
4212    }
4213
4214    #[test]
4215    fn parse_default_literal_integer() {
4216        assert_eq!(parse_default_literal("42"), Value::Int64(42));
4217        assert_eq!(parse_default_literal("-7"), Value::Int64(-7));
4218        assert_eq!(parse_default_literal("0"), Value::Int64(0));
4219    }
4220
4221    #[test]
4222    fn parse_default_literal_float() {
4223        assert_eq!(parse_default_literal("9.81"), Value::Float64(9.81_f64));
4224        assert_eq!(parse_default_literal("-0.5"), Value::Float64(-0.5));
4225    }
4226
4227    #[test]
4228    fn parse_default_literal_fallback_string() {
4229        // Not a recognized literal, not quoted, not a number
4230        assert_eq!(
4231            parse_default_literal("some_identifier"),
4232            Value::String("some_identifier".into())
4233        );
4234    }
4235
4236    #[test]
4237    fn test_session_create_node() {
4238        let db = GrafeoDB::new_in_memory();
4239        let session = db.session();
4240
4241        let id = session.create_node(&["Person"]);
4242        assert!(id.is_valid());
4243        assert_eq!(db.node_count(), 1);
4244    }
4245
4246    #[test]
4247    fn test_session_transaction() {
4248        let db = GrafeoDB::new_in_memory();
4249        let mut session = db.session();
4250
4251        assert!(!session.in_transaction());
4252
4253        session.begin_transaction().unwrap();
4254        assert!(session.in_transaction());
4255
4256        session.commit().unwrap();
4257        assert!(!session.in_transaction());
4258    }
4259
4260    #[test]
4261    fn test_session_transaction_context() {
4262        let db = GrafeoDB::new_in_memory();
4263        let mut session = db.session();
4264
4265        // Without transaction - context should have current epoch and no transaction_id
4266        let (_epoch1, transaction_id1) = session.get_transaction_context();
4267        assert!(transaction_id1.is_none());
4268
4269        // Start a transaction
4270        session.begin_transaction().unwrap();
4271        let (epoch2, transaction_id2) = session.get_transaction_context();
4272        assert!(transaction_id2.is_some());
4273        // Transaction should have a valid epoch
4274        let _ = epoch2; // Use the variable
4275
4276        // Commit and verify
4277        session.commit().unwrap();
4278        let (epoch3, tx_id3) = session.get_transaction_context();
4279        assert!(tx_id3.is_none());
4280        // Epoch should have advanced after commit
4281        assert!(epoch3.as_u64() >= epoch2.as_u64());
4282    }
4283
4284    #[test]
4285    fn test_session_rollback() {
4286        let db = GrafeoDB::new_in_memory();
4287        let mut session = db.session();
4288
4289        session.begin_transaction().unwrap();
4290        session.rollback().unwrap();
4291        assert!(!session.in_transaction());
4292    }
4293
4294    #[test]
4295    fn test_session_rollback_discards_versions() {
4296        use grafeo_common::types::TransactionId;
4297
4298        let db = GrafeoDB::new_in_memory();
4299
4300        // Create a node outside of any transaction (at system level)
4301        let node_before = db.store().create_node(&["Person"]);
4302        assert!(node_before.is_valid());
4303        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4304
4305        // Start a transaction
4306        let mut session = db.session();
4307        session.begin_transaction().unwrap();
4308        let transaction_id = session.current_transaction.lock().unwrap();
4309
4310        // Create a node versioned with the transaction's ID
4311        let epoch = db.store().current_epoch();
4312        let node_in_tx = db
4313            .store()
4314            .create_node_versioned(&["Person"], epoch, transaction_id);
4315        assert!(node_in_tx.is_valid());
4316
4317        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4318        // non-versioned lookups like node_count(). Verify the node is visible
4319        // only through the owning transaction.
4320        assert_eq!(
4321            db.node_count(),
4322            1,
4323            "PENDING nodes should be invisible to non-versioned node_count()"
4324        );
4325        assert!(
4326            db.store()
4327                .get_node_versioned(node_in_tx, epoch, transaction_id)
4328                .is_some(),
4329            "Transaction node should be visible to its own transaction"
4330        );
4331
4332        // Rollback the transaction
4333        session.rollback().unwrap();
4334        assert!(!session.in_transaction());
4335
4336        // The node created in the transaction should be discarded
4337        // Only the first node should remain visible
4338        let count_after = db.node_count();
4339        assert_eq!(
4340            count_after, 1,
4341            "Rollback should discard uncommitted node, but got {count_after}"
4342        );
4343
4344        // The original node should still be accessible
4345        let current_epoch = db.store().current_epoch();
4346        assert!(
4347            db.store()
4348                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4349                .is_some(),
4350            "Original node should still exist"
4351        );
4352
4353        // The node created in the transaction should not be accessible
4354        assert!(
4355            db.store()
4356                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4357                .is_none(),
4358            "Transaction node should be gone"
4359        );
4360    }
4361
4362    #[test]
4363    fn test_session_create_node_in_transaction() {
4364        // Test that session.create_node() is transaction-aware
4365        let db = GrafeoDB::new_in_memory();
4366
4367        // Create a node outside of any transaction
4368        let node_before = db.create_node(&["Person"]);
4369        assert!(node_before.is_valid());
4370        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4371
4372        // Start a transaction and create a node through the session
4373        let mut session = db.session();
4374        session.begin_transaction().unwrap();
4375        let transaction_id = session.current_transaction.lock().unwrap();
4376
4377        // Create a node through session.create_node() - should be versioned with tx
4378        let node_in_tx = session.create_node(&["Person"]);
4379        assert!(node_in_tx.is_valid());
4380
4381        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4382        // non-versioned lookups. Verify the node is visible only to its own tx.
4383        assert_eq!(
4384            db.node_count(),
4385            1,
4386            "PENDING nodes should be invisible to non-versioned node_count()"
4387        );
4388        let epoch = db.store().current_epoch();
4389        assert!(
4390            db.store()
4391                .get_node_versioned(node_in_tx, epoch, transaction_id)
4392                .is_some(),
4393            "Transaction node should be visible to its own transaction"
4394        );
4395
4396        // Rollback the transaction
4397        session.rollback().unwrap();
4398
4399        // The node created via session.create_node() should be discarded
4400        let count_after = db.node_count();
4401        assert_eq!(
4402            count_after, 1,
4403            "Rollback should discard node created via session.create_node(), but got {count_after}"
4404        );
4405    }
4406
4407    #[test]
4408    fn test_session_create_node_with_props_in_transaction() {
4409        use grafeo_common::types::Value;
4410
4411        // Test that session.create_node_with_props() is transaction-aware
4412        let db = GrafeoDB::new_in_memory();
4413
4414        // Create a node outside of any transaction
4415        db.create_node(&["Person"]);
4416        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4417
4418        // Start a transaction and create a node with properties
4419        let mut session = db.session();
4420        session.begin_transaction().unwrap();
4421        let transaction_id = session.current_transaction.lock().unwrap();
4422
4423        let node_in_tx =
4424            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4425        assert!(node_in_tx.is_valid());
4426
4427        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4428        // non-versioned lookups. Verify the node is visible only to its own tx.
4429        assert_eq!(
4430            db.node_count(),
4431            1,
4432            "PENDING nodes should be invisible to non-versioned node_count()"
4433        );
4434        let epoch = db.store().current_epoch();
4435        assert!(
4436            db.store()
4437                .get_node_versioned(node_in_tx, epoch, transaction_id)
4438                .is_some(),
4439            "Transaction node should be visible to its own transaction"
4440        );
4441
4442        // Rollback the transaction
4443        session.rollback().unwrap();
4444
4445        // The node should be discarded
4446        let count_after = db.node_count();
4447        assert_eq!(
4448            count_after, 1,
4449            "Rollback should discard node created via session.create_node_with_props()"
4450        );
4451    }
4452
4453    #[cfg(feature = "gql")]
4454    mod gql_tests {
4455        use super::*;
4456
4457        #[test]
4458        fn test_gql_query_execution() {
4459            let db = GrafeoDB::new_in_memory();
4460            let session = db.session();
4461
4462            // Create some test data
4463            session.create_node(&["Person"]);
4464            session.create_node(&["Person"]);
4465            session.create_node(&["Animal"]);
4466
4467            // Execute a GQL query
4468            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4469
4470            // Should return 2 Person nodes
4471            assert_eq!(result.row_count(), 2);
4472            assert_eq!(result.column_count(), 1);
4473            assert_eq!(result.columns[0], "n");
4474        }
4475
4476        #[test]
4477        fn test_gql_empty_result() {
4478            let db = GrafeoDB::new_in_memory();
4479            let session = db.session();
4480
4481            // No data in database
4482            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4483
4484            assert_eq!(result.row_count(), 0);
4485        }
4486
4487        #[test]
4488        fn test_gql_parse_error() {
4489            let db = GrafeoDB::new_in_memory();
4490            let session = db.session();
4491
4492            // Invalid GQL syntax
4493            let result = session.execute("MATCH (n RETURN n");
4494
4495            assert!(result.is_err());
4496        }
4497
4498        #[test]
4499        fn test_gql_relationship_traversal() {
4500            let db = GrafeoDB::new_in_memory();
4501            let session = db.session();
4502
4503            // Create a graph: Alix -> Gus, Alix -> Vincent
4504            let alix = session.create_node(&["Person"]);
4505            let gus = session.create_node(&["Person"]);
4506            let vincent = session.create_node(&["Person"]);
4507
4508            session.create_edge(alix, gus, "KNOWS");
4509            session.create_edge(alix, vincent, "KNOWS");
4510
4511            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4512            let result = session
4513                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4514                .unwrap();
4515
4516            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4517            assert_eq!(result.row_count(), 2);
4518            assert_eq!(result.column_count(), 2);
4519            assert_eq!(result.columns[0], "a");
4520            assert_eq!(result.columns[1], "b");
4521        }
4522
4523        #[test]
4524        fn test_gql_relationship_with_type_filter() {
4525            let db = GrafeoDB::new_in_memory();
4526            let session = db.session();
4527
4528            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4529            let alix = session.create_node(&["Person"]);
4530            let gus = session.create_node(&["Person"]);
4531            let vincent = session.create_node(&["Person"]);
4532
4533            session.create_edge(alix, gus, "KNOWS");
4534            session.create_edge(alix, vincent, "WORKS_WITH");
4535
4536            // Query only KNOWS relationships
4537            let result = session
4538                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4539                .unwrap();
4540
4541            // Should return only 1 row (Alix->Gus)
4542            assert_eq!(result.row_count(), 1);
4543        }
4544
4545        #[test]
4546        fn test_gql_semantic_error_undefined_variable() {
4547            let db = GrafeoDB::new_in_memory();
4548            let session = db.session();
4549
4550            // Reference undefined variable 'x' in RETURN
4551            let result = session.execute("MATCH (n:Person) RETURN x");
4552
4553            // Should fail with semantic error
4554            assert!(result.is_err());
4555            let Err(err) = result else {
4556                panic!("Expected error")
4557            };
4558            assert!(
4559                err.to_string().contains("Undefined variable"),
4560                "Expected undefined variable error, got: {}",
4561                err
4562            );
4563        }
4564
4565        #[test]
4566        fn test_gql_where_clause_property_filter() {
4567            use grafeo_common::types::Value;
4568
4569            let db = GrafeoDB::new_in_memory();
4570            let session = db.session();
4571
4572            // Create people with ages
4573            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4574            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4575            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4576
4577            // Query with WHERE clause: age > 30
4578            let result = session
4579                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4580                .unwrap();
4581
4582            // Should return 2 people (ages 35 and 45)
4583            assert_eq!(result.row_count(), 2);
4584        }
4585
4586        #[test]
4587        fn test_gql_where_clause_equality() {
4588            use grafeo_common::types::Value;
4589
4590            let db = GrafeoDB::new_in_memory();
4591            let session = db.session();
4592
4593            // Create people with names
4594            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4595            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4596            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4597
4598            // Query with WHERE clause: name = "Alix"
4599            let result = session
4600                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4601                .unwrap();
4602
4603            // Should return 2 people named Alix
4604            assert_eq!(result.row_count(), 2);
4605        }
4606
4607        #[test]
4608        fn test_gql_return_property_access() {
4609            use grafeo_common::types::Value;
4610
4611            let db = GrafeoDB::new_in_memory();
4612            let session = db.session();
4613
4614            // Create people with names and ages
4615            session.create_node_with_props(
4616                &["Person"],
4617                [
4618                    ("name", Value::String("Alix".into())),
4619                    ("age", Value::Int64(30)),
4620                ],
4621            );
4622            session.create_node_with_props(
4623                &["Person"],
4624                [
4625                    ("name", Value::String("Gus".into())),
4626                    ("age", Value::Int64(25)),
4627                ],
4628            );
4629
4630            // Query returning properties
4631            let result = session
4632                .execute("MATCH (n:Person) RETURN n.name, n.age")
4633                .unwrap();
4634
4635            // Should return 2 rows with name and age columns
4636            assert_eq!(result.row_count(), 2);
4637            assert_eq!(result.column_count(), 2);
4638            assert_eq!(result.columns[0], "n.name");
4639            assert_eq!(result.columns[1], "n.age");
4640
4641            // Check that we get actual values
4642            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4643            assert!(names.contains(&&Value::String("Alix".into())));
4644            assert!(names.contains(&&Value::String("Gus".into())));
4645        }
4646
4647        #[test]
4648        fn test_gql_return_mixed_expressions() {
4649            use grafeo_common::types::Value;
4650
4651            let db = GrafeoDB::new_in_memory();
4652            let session = db.session();
4653
4654            // Create a person
4655            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4656
4657            // Query returning both node and property
4658            let result = session
4659                .execute("MATCH (n:Person) RETURN n, n.name")
4660                .unwrap();
4661
4662            assert_eq!(result.row_count(), 1);
4663            assert_eq!(result.column_count(), 2);
4664            assert_eq!(result.columns[0], "n");
4665            assert_eq!(result.columns[1], "n.name");
4666
4667            // Second column should be the name
4668            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4669        }
4670    }
4671
4672    #[cfg(feature = "cypher")]
4673    mod cypher_tests {
4674        use super::*;
4675
4676        #[test]
4677        fn test_cypher_query_execution() {
4678            let db = GrafeoDB::new_in_memory();
4679            let session = db.session();
4680
4681            // Create some test data
4682            session.create_node(&["Person"]);
4683            session.create_node(&["Person"]);
4684            session.create_node(&["Animal"]);
4685
4686            // Execute a Cypher query
4687            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4688
4689            // Should return 2 Person nodes
4690            assert_eq!(result.row_count(), 2);
4691            assert_eq!(result.column_count(), 1);
4692            assert_eq!(result.columns[0], "n");
4693        }
4694
4695        #[test]
4696        fn test_cypher_empty_result() {
4697            let db = GrafeoDB::new_in_memory();
4698            let session = db.session();
4699
4700            // No data in database
4701            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4702
4703            assert_eq!(result.row_count(), 0);
4704        }
4705
4706        #[test]
4707        fn test_cypher_parse_error() {
4708            let db = GrafeoDB::new_in_memory();
4709            let session = db.session();
4710
4711            // Invalid Cypher syntax
4712            let result = session.execute_cypher("MATCH (n RETURN n");
4713
4714            assert!(result.is_err());
4715        }
4716    }
4717
4718    // ==================== Direct Lookup API Tests ====================
4719
4720    mod direct_lookup_tests {
4721        use super::*;
4722        use grafeo_common::types::Value;
4723
4724        #[test]
4725        fn test_get_node() {
4726            let db = GrafeoDB::new_in_memory();
4727            let session = db.session();
4728
4729            let id = session.create_node(&["Person"]);
4730            let node = session.get_node(id);
4731
4732            assert!(node.is_some());
4733            let node = node.unwrap();
4734            assert_eq!(node.id, id);
4735        }
4736
4737        #[test]
4738        fn test_get_node_not_found() {
4739            use grafeo_common::types::NodeId;
4740
4741            let db = GrafeoDB::new_in_memory();
4742            let session = db.session();
4743
4744            // Try to get a non-existent node
4745            let node = session.get_node(NodeId::new(9999));
4746            assert!(node.is_none());
4747        }
4748
4749        #[test]
4750        fn test_get_node_property() {
4751            let db = GrafeoDB::new_in_memory();
4752            let session = db.session();
4753
4754            let id = session
4755                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4756
4757            let name = session.get_node_property(id, "name");
4758            assert_eq!(name, Some(Value::String("Alix".into())));
4759
4760            // Non-existent property
4761            let missing = session.get_node_property(id, "missing");
4762            assert!(missing.is_none());
4763        }
4764
4765        #[test]
4766        fn test_get_edge() {
4767            let db = GrafeoDB::new_in_memory();
4768            let session = db.session();
4769
4770            let alix = session.create_node(&["Person"]);
4771            let gus = session.create_node(&["Person"]);
4772            let edge_id = session.create_edge(alix, gus, "KNOWS");
4773
4774            let edge = session.get_edge(edge_id);
4775            assert!(edge.is_some());
4776            let edge = edge.unwrap();
4777            assert_eq!(edge.id, edge_id);
4778            assert_eq!(edge.src, alix);
4779            assert_eq!(edge.dst, gus);
4780        }
4781
4782        #[test]
4783        fn test_get_edge_not_found() {
4784            use grafeo_common::types::EdgeId;
4785
4786            let db = GrafeoDB::new_in_memory();
4787            let session = db.session();
4788
4789            let edge = session.get_edge(EdgeId::new(9999));
4790            assert!(edge.is_none());
4791        }
4792
4793        #[test]
4794        fn test_get_neighbors_outgoing() {
4795            let db = GrafeoDB::new_in_memory();
4796            let session = db.session();
4797
4798            let alix = session.create_node(&["Person"]);
4799            let gus = session.create_node(&["Person"]);
4800            let harm = session.create_node(&["Person"]);
4801
4802            session.create_edge(alix, gus, "KNOWS");
4803            session.create_edge(alix, harm, "KNOWS");
4804
4805            let neighbors = session.get_neighbors_outgoing(alix);
4806            assert_eq!(neighbors.len(), 2);
4807
4808            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4809            assert!(neighbor_ids.contains(&gus));
4810            assert!(neighbor_ids.contains(&harm));
4811        }
4812
4813        #[test]
4814        fn test_get_neighbors_incoming() {
4815            let db = GrafeoDB::new_in_memory();
4816            let session = db.session();
4817
4818            let alix = session.create_node(&["Person"]);
4819            let gus = session.create_node(&["Person"]);
4820            let harm = session.create_node(&["Person"]);
4821
4822            session.create_edge(gus, alix, "KNOWS");
4823            session.create_edge(harm, alix, "KNOWS");
4824
4825            let neighbors = session.get_neighbors_incoming(alix);
4826            assert_eq!(neighbors.len(), 2);
4827
4828            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4829            assert!(neighbor_ids.contains(&gus));
4830            assert!(neighbor_ids.contains(&harm));
4831        }
4832
4833        #[test]
4834        fn test_get_neighbors_outgoing_by_type() {
4835            let db = GrafeoDB::new_in_memory();
4836            let session = db.session();
4837
4838            let alix = session.create_node(&["Person"]);
4839            let gus = session.create_node(&["Person"]);
4840            let company = session.create_node(&["Company"]);
4841
4842            session.create_edge(alix, gus, "KNOWS");
4843            session.create_edge(alix, company, "WORKS_AT");
4844
4845            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4846            assert_eq!(knows_neighbors.len(), 1);
4847            assert_eq!(knows_neighbors[0].0, gus);
4848
4849            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4850            assert_eq!(works_neighbors.len(), 1);
4851            assert_eq!(works_neighbors[0].0, company);
4852
4853            // No edges of this type
4854            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4855            assert!(no_neighbors.is_empty());
4856        }
4857
4858        #[test]
4859        fn test_node_exists() {
4860            use grafeo_common::types::NodeId;
4861
4862            let db = GrafeoDB::new_in_memory();
4863            let session = db.session();
4864
4865            let id = session.create_node(&["Person"]);
4866
4867            assert!(session.node_exists(id));
4868            assert!(!session.node_exists(NodeId::new(9999)));
4869        }
4870
4871        #[test]
4872        fn test_edge_exists() {
4873            use grafeo_common::types::EdgeId;
4874
4875            let db = GrafeoDB::new_in_memory();
4876            let session = db.session();
4877
4878            let alix = session.create_node(&["Person"]);
4879            let gus = session.create_node(&["Person"]);
4880            let edge_id = session.create_edge(alix, gus, "KNOWS");
4881
4882            assert!(session.edge_exists(edge_id));
4883            assert!(!session.edge_exists(EdgeId::new(9999)));
4884        }
4885
4886        #[test]
4887        fn test_get_degree() {
4888            let db = GrafeoDB::new_in_memory();
4889            let session = db.session();
4890
4891            let alix = session.create_node(&["Person"]);
4892            let gus = session.create_node(&["Person"]);
4893            let harm = session.create_node(&["Person"]);
4894
4895            // Alix knows Gus and Harm (2 outgoing)
4896            session.create_edge(alix, gus, "KNOWS");
4897            session.create_edge(alix, harm, "KNOWS");
4898            // Gus knows Alix (1 incoming for Alix)
4899            session.create_edge(gus, alix, "KNOWS");
4900
4901            let (out_degree, in_degree) = session.get_degree(alix);
4902            assert_eq!(out_degree, 2);
4903            assert_eq!(in_degree, 1);
4904
4905            // Node with no edges
4906            let lonely = session.create_node(&["Person"]);
4907            let (out, in_deg) = session.get_degree(lonely);
4908            assert_eq!(out, 0);
4909            assert_eq!(in_deg, 0);
4910        }
4911
4912        #[test]
4913        fn test_get_nodes_batch() {
4914            let db = GrafeoDB::new_in_memory();
4915            let session = db.session();
4916
4917            let alix = session.create_node(&["Person"]);
4918            let gus = session.create_node(&["Person"]);
4919            let harm = session.create_node(&["Person"]);
4920
4921            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4922            assert_eq!(nodes.len(), 3);
4923            assert!(nodes[0].is_some());
4924            assert!(nodes[1].is_some());
4925            assert!(nodes[2].is_some());
4926
4927            // With non-existent node
4928            use grafeo_common::types::NodeId;
4929            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4930            assert_eq!(nodes_with_missing.len(), 3);
4931            assert!(nodes_with_missing[0].is_some());
4932            assert!(nodes_with_missing[1].is_none()); // Missing node
4933            assert!(nodes_with_missing[2].is_some());
4934        }
4935
4936        #[test]
4937        fn test_auto_commit_setting() {
4938            let db = GrafeoDB::new_in_memory();
4939            let mut session = db.session();
4940
4941            // Default is auto-commit enabled
4942            assert!(session.auto_commit());
4943
4944            session.set_auto_commit(false);
4945            assert!(!session.auto_commit());
4946
4947            session.set_auto_commit(true);
4948            assert!(session.auto_commit());
4949        }
4950
4951        #[test]
4952        fn test_transaction_double_begin_nests() {
4953            let db = GrafeoDB::new_in_memory();
4954            let mut session = db.session();
4955
4956            session.begin_transaction().unwrap();
4957            // Second begin_transaction creates a nested transaction (auto-savepoint)
4958            let result = session.begin_transaction();
4959            assert!(result.is_ok());
4960            // Commit the inner (releases savepoint)
4961            session.commit().unwrap();
4962            // Commit the outer
4963            session.commit().unwrap();
4964        }
4965
4966        #[test]
4967        fn test_commit_without_transaction_error() {
4968            let db = GrafeoDB::new_in_memory();
4969            let mut session = db.session();
4970
4971            let result = session.commit();
4972            assert!(result.is_err());
4973        }
4974
4975        #[test]
4976        fn test_rollback_without_transaction_error() {
4977            let db = GrafeoDB::new_in_memory();
4978            let mut session = db.session();
4979
4980            let result = session.rollback();
4981            assert!(result.is_err());
4982        }
4983
4984        #[test]
4985        fn test_create_edge_in_transaction() {
4986            let db = GrafeoDB::new_in_memory();
4987            let mut session = db.session();
4988
4989            // Create nodes outside transaction
4990            let alix = session.create_node(&["Person"]);
4991            let gus = session.create_node(&["Person"]);
4992
4993            // Create edge in transaction
4994            session.begin_transaction().unwrap();
4995            let edge_id = session.create_edge(alix, gus, "KNOWS");
4996
4997            // Edge should be visible in the transaction
4998            assert!(session.edge_exists(edge_id));
4999
5000            // Commit
5001            session.commit().unwrap();
5002
5003            // Edge should still be visible
5004            assert!(session.edge_exists(edge_id));
5005        }
5006
5007        #[test]
5008        fn test_neighbors_empty_node() {
5009            let db = GrafeoDB::new_in_memory();
5010            let session = db.session();
5011
5012            let lonely = session.create_node(&["Person"]);
5013
5014            assert!(session.get_neighbors_outgoing(lonely).is_empty());
5015            assert!(session.get_neighbors_incoming(lonely).is_empty());
5016            assert!(
5017                session
5018                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
5019                    .is_empty()
5020            );
5021        }
5022    }
5023
5024    #[test]
5025    fn test_auto_gc_triggers_on_commit_interval() {
5026        use crate::config::Config;
5027
5028        let config = Config::in_memory().with_gc_interval(2);
5029        let db = GrafeoDB::with_config(config).unwrap();
5030        let mut session = db.session();
5031
5032        // First commit: counter = 1, no GC (not a multiple of 2)
5033        session.begin_transaction().unwrap();
5034        session.create_node(&["A"]);
5035        session.commit().unwrap();
5036
5037        // Second commit: counter = 2, GC should trigger (multiple of 2)
5038        session.begin_transaction().unwrap();
5039        session.create_node(&["B"]);
5040        session.commit().unwrap();
5041
5042        // Verify the database is still functional after GC
5043        assert_eq!(db.node_count(), 2);
5044    }
5045
5046    #[test]
5047    fn test_query_timeout_config_propagates_to_session() {
5048        use crate::config::Config;
5049        use std::time::Duration;
5050
5051        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
5052        let db = GrafeoDB::with_config(config).unwrap();
5053        let session = db.session();
5054
5055        // Verify the session has a query deadline (timeout was set)
5056        assert!(session.query_deadline().is_some());
5057    }
5058
5059    #[test]
5060    fn test_no_query_timeout_returns_no_deadline() {
5061        let db = GrafeoDB::new_in_memory();
5062        let session = db.session();
5063
5064        // Default config has no timeout
5065        assert!(session.query_deadline().is_none());
5066    }
5067
5068    #[test]
5069    fn test_graph_model_accessor() {
5070        use crate::config::GraphModel;
5071
5072        let db = GrafeoDB::new_in_memory();
5073        let session = db.session();
5074
5075        assert_eq!(session.graph_model(), GraphModel::Lpg);
5076    }
5077
5078    #[cfg(feature = "gql")]
5079    #[test]
5080    fn test_external_store_session() {
5081        use grafeo_core::graph::GraphStoreMut;
5082        use std::sync::Arc;
5083
5084        let config = crate::config::Config::in_memory();
5085        let store =
5086            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
5087        let db = GrafeoDB::with_store(store, config).unwrap();
5088
5089        let mut session = db.session();
5090
5091        // Use an explicit transaction so that INSERT and MATCH share the same
5092        // transaction context. With PENDING epochs, uncommitted versions are
5093        // only visible to the owning transaction.
5094        session.begin_transaction().unwrap();
5095        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
5096
5097        // Verify we can query through it within the same transaction
5098        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
5099        assert_eq!(result.row_count(), 1);
5100
5101        session.commit().unwrap();
5102    }
5103
5104    // ==================== Session Command Tests ====================
5105
5106    #[cfg(feature = "gql")]
5107    mod session_command_tests {
5108        use super::*;
5109        use grafeo_common::types::Value;
5110
5111        #[test]
5112        fn test_use_graph_sets_current_graph() {
5113            let db = GrafeoDB::new_in_memory();
5114            let session = db.session();
5115
5116            // Create the graph first, then USE it
5117            session.execute("CREATE GRAPH mydb").unwrap();
5118            session.execute("USE GRAPH mydb").unwrap();
5119
5120            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5121        }
5122
5123        #[test]
5124        fn test_use_graph_nonexistent_errors() {
5125            let db = GrafeoDB::new_in_memory();
5126            let session = db.session();
5127
5128            let result = session.execute("USE GRAPH doesnotexist");
5129            assert!(result.is_err());
5130            let err = result.unwrap_err().to_string();
5131            assert!(
5132                err.contains("does not exist"),
5133                "Expected 'does not exist' error, got: {err}"
5134            );
5135        }
5136
5137        #[test]
5138        fn test_use_graph_default_always_valid() {
5139            let db = GrafeoDB::new_in_memory();
5140            let session = db.session();
5141
5142            // "default" is always valid, even without CREATE GRAPH
5143            session.execute("USE GRAPH default").unwrap();
5144            assert_eq!(session.current_graph(), Some("default".to_string()));
5145        }
5146
5147        #[test]
5148        fn test_session_set_graph() {
5149            let db = GrafeoDB::new_in_memory();
5150            let session = db.session();
5151
5152            session.execute("CREATE GRAPH analytics").unwrap();
5153            session.execute("SESSION SET GRAPH analytics").unwrap();
5154            assert_eq!(session.current_graph(), Some("analytics".to_string()));
5155        }
5156
5157        #[test]
5158        fn test_session_set_graph_nonexistent_errors() {
5159            let db = GrafeoDB::new_in_memory();
5160            let session = db.session();
5161
5162            let result = session.execute("SESSION SET GRAPH nosuchgraph");
5163            assert!(result.is_err());
5164        }
5165
5166        #[test]
5167        fn test_session_set_time_zone() {
5168            let db = GrafeoDB::new_in_memory();
5169            let session = db.session();
5170
5171            assert_eq!(session.time_zone(), None);
5172
5173            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5174            assert_eq!(session.time_zone(), Some("UTC".to_string()));
5175
5176            session
5177                .execute("SESSION SET TIME ZONE 'America/New_York'")
5178                .unwrap();
5179            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
5180        }
5181
5182        #[test]
5183        fn test_session_set_parameter() {
5184            let db = GrafeoDB::new_in_memory();
5185            let session = db.session();
5186
5187            session
5188                .execute("SESSION SET PARAMETER $timeout = 30")
5189                .unwrap();
5190
5191            // Parameter is stored (value is Null for now, since expression
5192            // evaluation is not yet wired up)
5193            assert!(session.get_parameter("timeout").is_some());
5194        }
5195
5196        #[test]
5197        fn test_session_reset_clears_all_state() {
5198            let db = GrafeoDB::new_in_memory();
5199            let session = db.session();
5200
5201            // Set various session state
5202            session.execute("CREATE GRAPH analytics").unwrap();
5203            session.execute("SESSION SET GRAPH analytics").unwrap();
5204            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5205            session
5206                .execute("SESSION SET PARAMETER $limit = 100")
5207                .unwrap();
5208
5209            // Verify state was set
5210            assert!(session.current_graph().is_some());
5211            assert!(session.time_zone().is_some());
5212            assert!(session.get_parameter("limit").is_some());
5213
5214            // Reset everything
5215            session.execute("SESSION RESET").unwrap();
5216
5217            assert_eq!(session.current_graph(), None);
5218            assert_eq!(session.time_zone(), None);
5219            assert!(session.get_parameter("limit").is_none());
5220        }
5221
5222        #[test]
5223        fn test_session_close_clears_state() {
5224            let db = GrafeoDB::new_in_memory();
5225            let session = db.session();
5226
5227            session.execute("CREATE GRAPH analytics").unwrap();
5228            session.execute("SESSION SET GRAPH analytics").unwrap();
5229            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
5230
5231            session.execute("SESSION CLOSE").unwrap();
5232
5233            assert_eq!(session.current_graph(), None);
5234            assert_eq!(session.time_zone(), None);
5235        }
5236
5237        #[test]
5238        fn test_create_graph() {
5239            let db = GrafeoDB::new_in_memory();
5240            let session = db.session();
5241
5242            session.execute("CREATE GRAPH mydb").unwrap();
5243
5244            // Should be able to USE it now
5245            session.execute("USE GRAPH mydb").unwrap();
5246            assert_eq!(session.current_graph(), Some("mydb".to_string()));
5247        }
5248
5249        #[test]
5250        fn test_create_graph_duplicate_errors() {
5251            let db = GrafeoDB::new_in_memory();
5252            let session = db.session();
5253
5254            session.execute("CREATE GRAPH mydb").unwrap();
5255            let result = session.execute("CREATE GRAPH mydb");
5256
5257            assert!(result.is_err());
5258            let err = result.unwrap_err().to_string();
5259            assert!(
5260                err.contains("already exists"),
5261                "Expected 'already exists' error, got: {err}"
5262            );
5263        }
5264
5265        #[test]
5266        fn test_create_graph_if_not_exists() {
5267            let db = GrafeoDB::new_in_memory();
5268            let session = db.session();
5269
5270            session.execute("CREATE GRAPH mydb").unwrap();
5271            // Should succeed silently with IF NOT EXISTS
5272            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
5273        }
5274
5275        #[test]
5276        fn test_drop_graph() {
5277            let db = GrafeoDB::new_in_memory();
5278            let session = db.session();
5279
5280            session.execute("CREATE GRAPH mydb").unwrap();
5281            session.execute("DROP GRAPH mydb").unwrap();
5282
5283            // Should no longer be usable
5284            let result = session.execute("USE GRAPH mydb");
5285            assert!(result.is_err());
5286        }
5287
5288        #[test]
5289        fn test_drop_graph_nonexistent_errors() {
5290            let db = GrafeoDB::new_in_memory();
5291            let session = db.session();
5292
5293            let result = session.execute("DROP GRAPH nosuchgraph");
5294            assert!(result.is_err());
5295            let err = result.unwrap_err().to_string();
5296            assert!(
5297                err.contains("does not exist"),
5298                "Expected 'does not exist' error, got: {err}"
5299            );
5300        }
5301
5302        #[test]
5303        fn test_drop_graph_if_exists() {
5304            let db = GrafeoDB::new_in_memory();
5305            let session = db.session();
5306
5307            // Should succeed silently with IF EXISTS
5308            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5309        }
5310
5311        #[test]
5312        fn test_start_transaction_via_gql() {
5313            let db = GrafeoDB::new_in_memory();
5314            let session = db.session();
5315
5316            session.execute("START TRANSACTION").unwrap();
5317            assert!(session.in_transaction());
5318            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5319            session.execute("COMMIT").unwrap();
5320            assert!(!session.in_transaction());
5321
5322            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5323            assert_eq!(result.rows.len(), 1);
5324        }
5325
5326        #[test]
5327        fn test_start_transaction_read_only_blocks_insert() {
5328            let db = GrafeoDB::new_in_memory();
5329            let session = db.session();
5330
5331            session.execute("START TRANSACTION READ ONLY").unwrap();
5332            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5333            assert!(result.is_err());
5334            let err = result.unwrap_err().to_string();
5335            assert!(
5336                err.contains("read-only"),
5337                "Expected read-only error, got: {err}"
5338            );
5339            session.execute("ROLLBACK").unwrap();
5340        }
5341
5342        #[test]
5343        fn test_start_transaction_read_only_allows_reads() {
5344            let db = GrafeoDB::new_in_memory();
5345            let mut session = db.session();
5346            session.begin_transaction().unwrap();
5347            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5348            session.commit().unwrap();
5349
5350            session.execute("START TRANSACTION READ ONLY").unwrap();
5351            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5352            assert_eq!(result.rows.len(), 1);
5353            session.execute("COMMIT").unwrap();
5354        }
5355
5356        #[test]
5357        fn test_rollback_via_gql() {
5358            let db = GrafeoDB::new_in_memory();
5359            let session = db.session();
5360
5361            session.execute("START TRANSACTION").unwrap();
5362            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5363            session.execute("ROLLBACK").unwrap();
5364
5365            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5366            assert!(result.rows.is_empty());
5367        }
5368
5369        #[test]
5370        fn test_start_transaction_with_isolation_level() {
5371            let db = GrafeoDB::new_in_memory();
5372            let session = db.session();
5373
5374            session
5375                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5376                .unwrap();
5377            assert!(session.in_transaction());
5378            session.execute("ROLLBACK").unwrap();
5379        }
5380
5381        #[test]
5382        fn test_session_commands_return_empty_result() {
5383            let db = GrafeoDB::new_in_memory();
5384            let session = db.session();
5385
5386            session.execute("CREATE GRAPH test").unwrap();
5387            let result = session.execute("SESSION SET GRAPH test").unwrap();
5388            assert_eq!(result.row_count(), 0);
5389            assert_eq!(result.column_count(), 0);
5390        }
5391
5392        #[test]
5393        fn test_current_graph_default_is_none() {
5394            let db = GrafeoDB::new_in_memory();
5395            let session = db.session();
5396
5397            assert_eq!(session.current_graph(), None);
5398        }
5399
5400        #[test]
5401        fn test_time_zone_default_is_none() {
5402            let db = GrafeoDB::new_in_memory();
5403            let session = db.session();
5404
5405            assert_eq!(session.time_zone(), None);
5406        }
5407
5408        #[test]
5409        fn test_session_state_independent_across_sessions() {
5410            let db = GrafeoDB::new_in_memory();
5411            let session1 = db.session();
5412            let session2 = db.session();
5413
5414            session1.execute("CREATE GRAPH first").unwrap();
5415            session1.execute("CREATE GRAPH second").unwrap();
5416            session1.execute("SESSION SET GRAPH first").unwrap();
5417            session2.execute("SESSION SET GRAPH second").unwrap();
5418
5419            assert_eq!(session1.current_graph(), Some("first".to_string()));
5420            assert_eq!(session2.current_graph(), Some("second".to_string()));
5421        }
5422
5423        #[test]
5424        fn test_show_node_types() {
5425            let db = GrafeoDB::new_in_memory();
5426            let session = db.session();
5427
5428            session
5429                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5430                .unwrap();
5431
5432            let result = session.execute("SHOW NODE TYPES").unwrap();
5433            assert_eq!(
5434                result.columns,
5435                vec!["name", "properties", "constraints", "parents"]
5436            );
5437            assert_eq!(result.rows.len(), 1);
5438            // First column is the type name
5439            assert_eq!(result.rows[0][0], Value::from("Person"));
5440        }
5441
5442        #[test]
5443        fn test_show_edge_types() {
5444            let db = GrafeoDB::new_in_memory();
5445            let session = db.session();
5446
5447            session
5448                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5449                .unwrap();
5450
5451            let result = session.execute("SHOW EDGE TYPES").unwrap();
5452            assert_eq!(
5453                result.columns,
5454                vec!["name", "properties", "source_types", "target_types"]
5455            );
5456            assert_eq!(result.rows.len(), 1);
5457            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5458        }
5459
5460        #[test]
5461        fn test_show_graph_types() {
5462            let db = GrafeoDB::new_in_memory();
5463            let session = db.session();
5464
5465            session
5466                .execute("CREATE NODE TYPE Person (name STRING)")
5467                .unwrap();
5468            session
5469                .execute(
5470                    "CREATE GRAPH TYPE social (\
5471                        NODE TYPE Person (name STRING)\
5472                    )",
5473                )
5474                .unwrap();
5475
5476            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5477            assert_eq!(
5478                result.columns,
5479                vec!["name", "open", "node_types", "edge_types"]
5480            );
5481            assert_eq!(result.rows.len(), 1);
5482            assert_eq!(result.rows[0][0], Value::from("social"));
5483        }
5484
5485        #[test]
5486        fn test_show_graph_type_named() {
5487            let db = GrafeoDB::new_in_memory();
5488            let session = db.session();
5489
5490            session
5491                .execute("CREATE NODE TYPE Person (name STRING)")
5492                .unwrap();
5493            session
5494                .execute(
5495                    "CREATE GRAPH TYPE social (\
5496                        NODE TYPE Person (name STRING)\
5497                    )",
5498                )
5499                .unwrap();
5500
5501            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5502            assert_eq!(result.rows.len(), 1);
5503            assert_eq!(result.rows[0][0], Value::from("social"));
5504        }
5505
5506        #[test]
5507        fn test_show_graph_type_not_found() {
5508            let db = GrafeoDB::new_in_memory();
5509            let session = db.session();
5510
5511            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5512            assert!(result.is_err());
5513        }
5514
5515        #[test]
5516        fn test_show_indexes_via_gql() {
5517            let db = GrafeoDB::new_in_memory();
5518            let session = db.session();
5519
5520            let result = session.execute("SHOW INDEXES").unwrap();
5521            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5522        }
5523
5524        #[test]
5525        fn test_show_constraints_via_gql() {
5526            let db = GrafeoDB::new_in_memory();
5527            let session = db.session();
5528
5529            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5530            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5531        }
5532
5533        #[test]
5534        fn test_pattern_form_graph_type_roundtrip() {
5535            let db = GrafeoDB::new_in_memory();
5536            let session = db.session();
5537
5538            // Register the types first
5539            session
5540                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5541                .unwrap();
5542            session
5543                .execute("CREATE NODE TYPE City (name STRING)")
5544                .unwrap();
5545            session
5546                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5547                .unwrap();
5548            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5549
5550            // Create graph type using pattern form
5551            session
5552                .execute(
5553                    "CREATE GRAPH TYPE social (\
5554                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5555                        (:Person)-[:LIVES_IN]->(:City)\
5556                    )",
5557                )
5558                .unwrap();
5559
5560            // Verify it was created
5561            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5562            assert_eq!(result.rows.len(), 1);
5563            assert_eq!(result.rows[0][0], Value::from("social"));
5564        }
5565    }
5566}