Skip to main content

grafeo_engine/
session.rs

1//! Lightweight handles for database interaction.
2//!
3//! A session is your conversation with the database. Each session can have
4//! its own transaction state, so concurrent sessions don't interfere with
5//! each other. Sessions are cheap to create - spin up as many as you need.
6
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use grafeo_common::types::{EdgeId, EpochId, NodeId, TransactionId, Value};
12use grafeo_common::utils::error::Result;
13use grafeo_core::graph::Direction;
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::{Edge, LpgStore, Node};
16#[cfg(feature = "rdf")]
17use grafeo_core::graph::rdf::RdfStore;
18
19use crate::catalog::{Catalog, CatalogConstraintValidator};
20use crate::config::{AdaptiveConfig, GraphModel};
21use crate::database::QueryResult;
22use crate::query::cache::QueryCache;
23use crate::transaction::TransactionManager;
24
25/// Parses a DDL default-value literal string into a [`Value`].
26///
27/// Handles string literals (single- or double-quoted), integers, floats,
28/// booleans (`true`/`false`), and `NULL`.
29fn parse_default_literal(text: &str) -> Value {
30    if text.eq_ignore_ascii_case("null") {
31        return Value::Null;
32    }
33    if text.eq_ignore_ascii_case("true") {
34        return Value::Bool(true);
35    }
36    if text.eq_ignore_ascii_case("false") {
37        return Value::Bool(false);
38    }
39    // String literal: strip surrounding quotes
40    if (text.starts_with('\'') && text.ends_with('\''))
41        || (text.starts_with('"') && text.ends_with('"'))
42    {
43        return Value::String(text[1..text.len() - 1].into());
44    }
45    // Try integer, then float
46    if let Ok(i) = text.parse::<i64>() {
47        return Value::Int64(i);
48    }
49    if let Ok(f) = text.parse::<f64>() {
50        return Value::Float64(f);
51    }
52    // Fallback: treat as string
53    Value::String(text.into())
54}
55
56/// Runtime configuration for creating a new session.
57///
58/// Groups the shared parameters passed to all session constructors, keeping
59/// call sites readable and avoiding long argument lists.
60pub(crate) struct SessionConfig {
61    pub transaction_manager: Arc<TransactionManager>,
62    pub query_cache: Arc<QueryCache>,
63    pub catalog: Arc<Catalog>,
64    pub adaptive_config: AdaptiveConfig,
65    pub factorized_execution: bool,
66    pub graph_model: GraphModel,
67    pub query_timeout: Option<Duration>,
68    pub commit_counter: Arc<AtomicUsize>,
69    pub gc_interval: usize,
70}
71
72/// Your handle to the database - execute queries and manage transactions.
73///
74/// Get one from [`GrafeoDB::session()`](crate::GrafeoDB::session). Each session
75/// tracks its own transaction state, so you can have multiple concurrent
76/// sessions without them interfering.
77pub struct Session {
78    /// The underlying store.
79    store: Arc<LpgStore>,
80    /// Graph store trait object for pluggable storage backends.
81    graph_store: Arc<dyn GraphStoreMut>,
82    /// Schema and metadata catalog shared across sessions.
83    catalog: Arc<Catalog>,
84    /// RDF triple store (if RDF feature is enabled).
85    #[cfg(feature = "rdf")]
86    rdf_store: Arc<RdfStore>,
87    /// Transaction manager.
88    transaction_manager: Arc<TransactionManager>,
89    /// Query cache shared across sessions.
90    query_cache: Arc<QueryCache>,
91    /// Current transaction ID (if any). Behind a Mutex so that GQL commands
92    /// (`START TRANSACTION`, `COMMIT`, `ROLLBACK`) can manage transactions
93    /// from within `execute(&self)`.
94    current_transaction: parking_lot::Mutex<Option<TransactionId>>,
95    /// Whether the current transaction is read-only (blocks mutations).
96    read_only_tx: parking_lot::Mutex<bool>,
97    /// Whether the session is in auto-commit mode.
98    auto_commit: bool,
99    /// Adaptive execution configuration.
100    #[allow(dead_code)] // Stored for future adaptive re-optimization during execution
101    adaptive_config: AdaptiveConfig,
102    /// Whether to use factorized execution for multi-hop queries.
103    factorized_execution: bool,
104    /// The graph data model this session operates on.
105    graph_model: GraphModel,
106    /// Maximum time a query may run before being cancelled.
107    query_timeout: Option<Duration>,
108    /// Shared commit counter for triggering auto-GC.
109    commit_counter: Arc<AtomicUsize>,
110    /// GC every N commits (0 = disabled).
111    gc_interval: usize,
112    /// Node count at the start of the current transaction (for PreparedCommit stats).
113    transaction_start_node_count: AtomicUsize,
114    /// Edge count at the start of the current transaction (for PreparedCommit stats).
115    transaction_start_edge_count: AtomicUsize,
116    /// WAL for logging schema changes.
117    #[cfg(feature = "wal")]
118    wal: Option<Arc<grafeo_adapters::storage::wal::LpgWal>>,
119    /// Shared WAL graph context tracker for named graph awareness.
120    #[cfg(feature = "wal")]
121    wal_graph_context: Option<Arc<parking_lot::Mutex<Option<String>>>>,
122    /// CDC log for change tracking.
123    #[cfg(feature = "cdc")]
124    cdc_log: Arc<crate::cdc::CdcLog>,
125    /// Current graph name (for multi-graph USE GRAPH support). None = default graph.
126    current_graph: parking_lot::Mutex<Option<String>>,
127    /// Current schema name (ISO/IEC 39075 Section 4.7.3: independent from session graph).
128    /// None = "not set" (uses default schema).
129    current_schema: parking_lot::Mutex<Option<String>>,
130    /// Session time zone override.
131    time_zone: parking_lot::Mutex<Option<String>>,
132    /// Session-level parameters (SET PARAMETER).
133    session_params:
134        parking_lot::Mutex<std::collections::HashMap<String, grafeo_common::types::Value>>,
135    /// Override epoch for time-travel queries (None = use transaction/current epoch).
136    viewing_epoch_override: parking_lot::Mutex<Option<EpochId>>,
137    /// Savepoints within the current transaction.
138    savepoints: parking_lot::Mutex<Vec<SavepointState>>,
139    /// Nesting depth for nested transactions (0 = outermost).
140    /// Nested `START TRANSACTION` creates an auto-savepoint; nested `COMMIT`
141    /// releases it, nested `ROLLBACK` rolls back to it.
142    transaction_nesting_depth: parking_lot::Mutex<u32>,
143    /// Named graphs touched during the current transaction (for cross-graph atomicity).
144    /// `None` represents the default graph. Populated at `BEGIN` time and on each
145    /// `USE GRAPH` / `SESSION SET GRAPH` switch within a transaction.
146    touched_graphs: parking_lot::Mutex<Vec<Option<String>>>,
147}
148
149/// Per-graph savepoint snapshot, capturing the store state at the time of the savepoint.
150#[derive(Clone)]
151struct GraphSavepoint {
152    graph_name: Option<String>,
153    next_node_id: u64,
154    next_edge_id: u64,
155    undo_log_position: usize,
156}
157
158/// Savepoint state: name + per-graph snapshots + the graph that was active.
159#[derive(Clone)]
160struct SavepointState {
161    name: String,
162    graph_snapshots: Vec<GraphSavepoint>,
163    /// The graph that was active when the savepoint was created.
164    /// Reserved for future use (e.g., restoring graph context on rollback).
165    #[allow(dead_code)]
166    active_graph: Option<String>,
167}
168
169impl Session {
170    /// Creates a new session with adaptive execution configuration.
171    #[allow(dead_code)]
172    pub(crate) fn with_adaptive(store: Arc<LpgStore>, cfg: SessionConfig) -> Self {
173        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
174        Self {
175            store,
176            graph_store,
177            catalog: cfg.catalog,
178            #[cfg(feature = "rdf")]
179            rdf_store: Arc::new(RdfStore::new()),
180            transaction_manager: cfg.transaction_manager,
181            query_cache: cfg.query_cache,
182            current_transaction: parking_lot::Mutex::new(None),
183            read_only_tx: parking_lot::Mutex::new(false),
184            auto_commit: true,
185            adaptive_config: cfg.adaptive_config,
186            factorized_execution: cfg.factorized_execution,
187            graph_model: cfg.graph_model,
188            query_timeout: cfg.query_timeout,
189            commit_counter: cfg.commit_counter,
190            gc_interval: cfg.gc_interval,
191            transaction_start_node_count: AtomicUsize::new(0),
192            transaction_start_edge_count: AtomicUsize::new(0),
193            #[cfg(feature = "wal")]
194            wal: None,
195            #[cfg(feature = "wal")]
196            wal_graph_context: None,
197            #[cfg(feature = "cdc")]
198            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
199            current_graph: parking_lot::Mutex::new(None),
200            current_schema: parking_lot::Mutex::new(None),
201            time_zone: parking_lot::Mutex::new(None),
202            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
203            viewing_epoch_override: parking_lot::Mutex::new(None),
204            savepoints: parking_lot::Mutex::new(Vec::new()),
205            transaction_nesting_depth: parking_lot::Mutex::new(0),
206            touched_graphs: parking_lot::Mutex::new(Vec::new()),
207        }
208    }
209
210    /// Sets the WAL for this session (shared with the database).
211    ///
212    /// This also wraps `graph_store` in a [`WalGraphStore`] so that mutation
213    /// operators (INSERT, DELETE, SET via queries) log to the WAL.
214    #[cfg(feature = "wal")]
215    pub(crate) fn set_wal(
216        &mut self,
217        wal: Arc<grafeo_adapters::storage::wal::LpgWal>,
218        wal_graph_context: Arc<parking_lot::Mutex<Option<String>>>,
219    ) {
220        // Wrap the graph store so query-engine mutations are WAL-logged
221        self.graph_store = Arc::new(crate::database::wal_store::WalGraphStore::new(
222            Arc::clone(&self.store),
223            Arc::clone(&wal),
224            Arc::clone(&wal_graph_context),
225        ));
226        self.wal = Some(wal);
227        self.wal_graph_context = Some(wal_graph_context);
228    }
229
230    /// Sets the CDC log for this session (shared with the database).
231    #[cfg(feature = "cdc")]
232    pub(crate) fn set_cdc_log(&mut self, cdc_log: Arc<crate::cdc::CdcLog>) {
233        self.cdc_log = cdc_log;
234    }
235
236    /// Creates a new session with RDF store and adaptive configuration.
237    #[cfg(feature = "rdf")]
238    pub(crate) fn with_rdf_store_and_adaptive(
239        store: Arc<LpgStore>,
240        rdf_store: Arc<RdfStore>,
241        cfg: SessionConfig,
242    ) -> Self {
243        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
244        Self {
245            store,
246            graph_store,
247            catalog: cfg.catalog,
248            rdf_store,
249            transaction_manager: cfg.transaction_manager,
250            query_cache: cfg.query_cache,
251            current_transaction: parking_lot::Mutex::new(None),
252            read_only_tx: parking_lot::Mutex::new(false),
253            auto_commit: true,
254            adaptive_config: cfg.adaptive_config,
255            factorized_execution: cfg.factorized_execution,
256            graph_model: cfg.graph_model,
257            query_timeout: cfg.query_timeout,
258            commit_counter: cfg.commit_counter,
259            gc_interval: cfg.gc_interval,
260            transaction_start_node_count: AtomicUsize::new(0),
261            transaction_start_edge_count: AtomicUsize::new(0),
262            #[cfg(feature = "wal")]
263            wal: None,
264            #[cfg(feature = "wal")]
265            wal_graph_context: None,
266            #[cfg(feature = "cdc")]
267            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
268            current_graph: parking_lot::Mutex::new(None),
269            current_schema: parking_lot::Mutex::new(None),
270            time_zone: parking_lot::Mutex::new(None),
271            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
272            viewing_epoch_override: parking_lot::Mutex::new(None),
273            savepoints: parking_lot::Mutex::new(Vec::new()),
274            transaction_nesting_depth: parking_lot::Mutex::new(0),
275            touched_graphs: parking_lot::Mutex::new(Vec::new()),
276        }
277    }
278
279    /// Creates a session backed by an external graph store.
280    ///
281    /// The external store handles all data operations. Transaction management
282    /// (begin/commit/rollback) is not supported for external stores.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the internal arena allocation fails (out of memory).
287    pub(crate) fn with_external_store(
288        store: Arc<dyn GraphStoreMut>,
289        cfg: SessionConfig,
290    ) -> Result<Self> {
291        Ok(Self {
292            store: Arc::new(LpgStore::new()?),
293            graph_store: store,
294            catalog: cfg.catalog,
295            #[cfg(feature = "rdf")]
296            rdf_store: Arc::new(RdfStore::new()),
297            transaction_manager: cfg.transaction_manager,
298            query_cache: cfg.query_cache,
299            current_transaction: parking_lot::Mutex::new(None),
300            read_only_tx: parking_lot::Mutex::new(false),
301            auto_commit: true,
302            adaptive_config: cfg.adaptive_config,
303            factorized_execution: cfg.factorized_execution,
304            graph_model: cfg.graph_model,
305            query_timeout: cfg.query_timeout,
306            commit_counter: cfg.commit_counter,
307            gc_interval: cfg.gc_interval,
308            transaction_start_node_count: AtomicUsize::new(0),
309            transaction_start_edge_count: AtomicUsize::new(0),
310            #[cfg(feature = "wal")]
311            wal: None,
312            #[cfg(feature = "wal")]
313            wal_graph_context: None,
314            #[cfg(feature = "cdc")]
315            cdc_log: Arc::new(crate::cdc::CdcLog::new()),
316            current_graph: parking_lot::Mutex::new(None),
317            current_schema: parking_lot::Mutex::new(None),
318            time_zone: parking_lot::Mutex::new(None),
319            session_params: parking_lot::Mutex::new(std::collections::HashMap::new()),
320            viewing_epoch_override: parking_lot::Mutex::new(None),
321            savepoints: parking_lot::Mutex::new(Vec::new()),
322            transaction_nesting_depth: parking_lot::Mutex::new(0),
323            touched_graphs: parking_lot::Mutex::new(Vec::new()),
324        })
325    }
326
327    /// Returns the graph model this session operates on.
328    #[must_use]
329    pub fn graph_model(&self) -> GraphModel {
330        self.graph_model
331    }
332
333    // === Session State Management ===
334
335    /// Sets the current graph for this session (USE GRAPH).
336    pub fn use_graph(&self, name: &str) {
337        *self.current_graph.lock() = Some(name.to_string());
338    }
339
340    /// Returns the current graph name, if any.
341    #[must_use]
342    pub fn current_graph(&self) -> Option<String> {
343        self.current_graph.lock().clone()
344    }
345
346    /// Sets the current schema for this session (SESSION SET SCHEMA).
347    ///
348    /// Per ISO/IEC 39075 Section 7.1 GR1, this is independent of the session graph.
349    pub fn set_schema(&self, name: &str) {
350        *self.current_schema.lock() = Some(name.to_string());
351    }
352
353    /// Returns the current schema name, if any.
354    ///
355    /// `None` means "not set", which resolves to the default schema.
356    #[must_use]
357    pub fn current_schema(&self) -> Option<String> {
358        self.current_schema.lock().clone()
359    }
360
361    /// Computes the effective storage key for a graph, accounting for schema context.
362    ///
363    /// Per ISO/IEC 39075 Section 17.2, graphs resolve relative to the current schema.
364    /// Uses `/` as separator since it is invalid in GQL identifiers.
365    fn effective_graph_key(&self, graph_name: &str) -> String {
366        let schema = self.current_schema.lock().clone();
367        match schema {
368            Some(s) => format!("{s}/{graph_name}"),
369            None => graph_name.to_string(),
370        }
371    }
372
373    /// Returns the effective storage key for the current graph, accounting for schema.
374    ///
375    /// Combines `current_schema` and `current_graph` into a flat lookup key.
376    fn active_graph_storage_key(&self) -> Option<String> {
377        let graph = self.current_graph.lock().clone();
378        let schema = self.current_schema.lock().clone();
379        match (schema, graph) {
380            (_, None) => None,
381            (_, Some(ref name)) if name.eq_ignore_ascii_case("default") => None,
382            (None, Some(name)) => Some(name),
383            (Some(s), Some(g)) => Some(format!("{s}/{g}")),
384        }
385    }
386
387    /// Returns the graph store for the currently active graph.
388    ///
389    /// If `current_graph` is `None` or `"default"`, returns the session's
390    /// default `graph_store` (already WAL-wrapped for the default graph).
391    /// Otherwise looks up the named graph in the root store and wraps it
392    /// in a [`WalGraphStore`] so mutations are WAL-logged with the correct
393    /// graph context.
394    fn active_store(&self) -> Arc<dyn GraphStoreMut> {
395        let key = self.active_graph_storage_key();
396        match key {
397            None => Arc::clone(&self.graph_store),
398            Some(ref name) => match self.store.graph(name) {
399                Some(named_store) => {
400                    #[cfg(feature = "wal")]
401                    if let (Some(wal), Some(ctx)) = (&self.wal, &self.wal_graph_context) {
402                        return Arc::new(crate::database::wal_store::WalGraphStore::new_for_graph(
403                            named_store,
404                            Arc::clone(wal),
405                            name.clone(),
406                            Arc::clone(ctx),
407                        )) as Arc<dyn GraphStoreMut>;
408                    }
409                    named_store as Arc<dyn GraphStoreMut>
410                }
411                None => Arc::clone(&self.graph_store),
412            },
413        }
414    }
415
416    /// Returns the concrete `LpgStore` for the currently active graph.
417    ///
418    /// Used by direct CRUD methods that need the concrete store type
419    /// for versioned operations.
420    fn active_lpg_store(&self) -> Arc<LpgStore> {
421        let key = self.active_graph_storage_key();
422        match key {
423            None => Arc::clone(&self.store),
424            Some(ref name) => self
425                .store
426                .graph(name)
427                .unwrap_or_else(|| Arc::clone(&self.store)),
428        }
429    }
430
431    /// Resolves a graph name to a concrete `LpgStore`.
432    /// `None` and `"default"` resolve to the session's root store.
433    fn resolve_store(&self, graph_name: &Option<String>) -> Arc<LpgStore> {
434        match graph_name {
435            None => Arc::clone(&self.store),
436            Some(name) if name.eq_ignore_ascii_case("default") => Arc::clone(&self.store),
437            Some(name) => self
438                .store
439                .graph(name)
440                .unwrap_or_else(|| Arc::clone(&self.store)),
441        }
442    }
443
444    /// Records the current graph as "touched" if a transaction is active.
445    ///
446    /// Uses the full storage key (schema/graph) so that commit/rollback
447    /// can resolve the correct store via `resolve_store`.
448    fn track_graph_touch(&self) {
449        if self.current_transaction.lock().is_some() {
450            let key = self.active_graph_storage_key();
451            let mut touched = self.touched_graphs.lock();
452            if !touched.contains(&key) {
453                touched.push(key);
454            }
455        }
456    }
457
458    /// Sets the session time zone.
459    pub fn set_time_zone(&self, tz: &str) {
460        *self.time_zone.lock() = Some(tz.to_string());
461    }
462
463    /// Returns the session time zone, if set.
464    #[must_use]
465    pub fn time_zone(&self) -> Option<String> {
466        self.time_zone.lock().clone()
467    }
468
469    /// Sets a session parameter.
470    pub fn set_parameter(&self, key: &str, value: grafeo_common::types::Value) {
471        self.session_params.lock().insert(key.to_string(), value);
472    }
473
474    /// Gets a session parameter by cloning it.
475    #[must_use]
476    pub fn get_parameter(&self, key: &str) -> Option<grafeo_common::types::Value> {
477        self.session_params.lock().get(key).cloned()
478    }
479
480    /// Resets all session state to defaults (ISO/IEC 39075 Section 7.2).
481    pub fn reset_session(&self) {
482        *self.current_schema.lock() = None;
483        *self.current_graph.lock() = None;
484        *self.time_zone.lock() = None;
485        self.session_params.lock().clear();
486        *self.viewing_epoch_override.lock() = None;
487    }
488
489    /// Resets only the session schema (Section 7.2 GR1).
490    pub fn reset_schema(&self) {
491        *self.current_schema.lock() = None;
492    }
493
494    /// Resets only the session graph (Section 7.2 GR2).
495    pub fn reset_graph(&self) {
496        *self.current_graph.lock() = None;
497    }
498
499    /// Resets only the session time zone (Section 7.2 GR3).
500    pub fn reset_time_zone(&self) {
501        *self.time_zone.lock() = None;
502    }
503
504    /// Resets only session parameters (Section 7.2 GR4).
505    pub fn reset_parameters(&self) {
506        self.session_params.lock().clear();
507    }
508
509    // --- Time-travel API ---
510
511    /// Sets a viewing epoch override for time-travel queries.
512    ///
513    /// While set, all queries on this session see the database as it existed
514    /// at the given epoch. Use [`clear_viewing_epoch`](Self::clear_viewing_epoch)
515    /// to return to normal behavior.
516    pub fn set_viewing_epoch(&self, epoch: EpochId) {
517        *self.viewing_epoch_override.lock() = Some(epoch);
518    }
519
520    /// Clears the viewing epoch override, returning to normal behavior.
521    pub fn clear_viewing_epoch(&self) {
522        *self.viewing_epoch_override.lock() = None;
523    }
524
525    /// Returns the current viewing epoch override, if any.
526    #[must_use]
527    pub fn viewing_epoch(&self) -> Option<EpochId> {
528        *self.viewing_epoch_override.lock()
529    }
530
531    /// Returns all versions of a node with their creation/deletion epochs.
532    ///
533    /// Properties and labels reflect the current state (not versioned per-epoch).
534    #[must_use]
535    pub fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
536        self.active_lpg_store().get_node_history(id)
537    }
538
539    /// Returns all versions of an edge with their creation/deletion epochs.
540    ///
541    /// Properties reflect the current state (not versioned per-epoch).
542    #[must_use]
543    pub fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
544        self.active_lpg_store().get_edge_history(id)
545    }
546
547    /// Checks that the session's graph model supports LPG operations.
548    fn require_lpg(&self, language: &str) -> Result<()> {
549        if self.graph_model == GraphModel::Rdf {
550            return Err(grafeo_common::utils::error::Error::Internal(format!(
551                "This is an RDF database. {language} queries require an LPG database."
552            )));
553        }
554        Ok(())
555    }
556
557    /// Executes a session or transaction command, returning an empty result.
558    #[cfg(feature = "gql")]
559    fn execute_session_command(
560        &self,
561        cmd: grafeo_adapters::query::gql::ast::SessionCommand,
562    ) -> Result<QueryResult> {
563        use grafeo_adapters::query::gql::ast::{SessionCommand, TransactionIsolationLevel};
564        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
565
566        // Block DDL in read-only transactions (ISO/IEC 39075 Section 8)
567        if *self.read_only_tx.lock() {
568            match &cmd {
569                SessionCommand::CreateGraph { .. } | SessionCommand::DropGraph { .. } => {
570                    return Err(Error::Transaction(
571                        grafeo_common::utils::error::TransactionError::ReadOnly,
572                    ));
573                }
574                _ => {} // Session state + transaction control allowed
575            }
576        }
577
578        match cmd {
579            SessionCommand::CreateGraph {
580                name,
581                if_not_exists,
582                typed,
583                like_graph,
584                copy_of,
585                open: _,
586            } => {
587                // ISO/IEC 39075 Section 12.4: graphs are created within the current schema
588                let storage_key = self.effective_graph_key(&name);
589
590                // Validate source graph exists for LIKE / AS COPY OF
591                if let Some(ref src) = like_graph {
592                    let src_key = self.effective_graph_key(src);
593                    if self.store.graph(&src_key).is_none() {
594                        return Err(Error::Query(QueryError::new(
595                            QueryErrorKind::Semantic,
596                            format!("Source graph '{src}' does not exist"),
597                        )));
598                    }
599                }
600                if let Some(ref src) = copy_of {
601                    let src_key = self.effective_graph_key(src);
602                    if self.store.graph(&src_key).is_none() {
603                        return Err(Error::Query(QueryError::new(
604                            QueryErrorKind::Semantic,
605                            format!("Source graph '{src}' does not exist"),
606                        )));
607                    }
608                }
609
610                let created = self
611                    .store
612                    .create_graph(&storage_key)
613                    .map_err(|e| Error::Internal(e.to_string()))?;
614                if !created && !if_not_exists {
615                    return Err(Error::Query(QueryError::new(
616                        QueryErrorKind::Semantic,
617                        format!("Graph '{name}' already exists"),
618                    )));
619                }
620                if created {
621                    #[cfg(feature = "wal")]
622                    self.log_schema_wal(
623                        &grafeo_adapters::storage::wal::WalRecord::CreateNamedGraph {
624                            name: storage_key.clone(),
625                        },
626                    );
627                }
628
629                // AS COPY OF: copy data from source graph
630                if let Some(ref src) = copy_of {
631                    let src_key = self.effective_graph_key(src);
632                    self.store
633                        .copy_graph(Some(&src_key), Some(&storage_key))
634                        .map_err(|e| Error::Internal(e.to_string()))?;
635                }
636
637                // Bind to graph type if specified
638                if let Some(type_name) = typed
639                    && let Err(e) = self
640                        .catalog
641                        .bind_graph_type(&storage_key, type_name.clone())
642                {
643                    return Err(Error::Query(QueryError::new(
644                        QueryErrorKind::Semantic,
645                        e.to_string(),
646                    )));
647                }
648
649                // LIKE: copy graph type binding from source
650                if let Some(ref src) = like_graph {
651                    let src_key = self.effective_graph_key(src);
652                    if let Some(src_type) = self.catalog.get_graph_type_binding(&src_key) {
653                        let _ = self.catalog.bind_graph_type(&storage_key, src_type);
654                    }
655                }
656
657                Ok(QueryResult::empty())
658            }
659            SessionCommand::DropGraph { name, if_exists } => {
660                let storage_key = self.effective_graph_key(&name);
661                let dropped = self.store.drop_graph(&storage_key);
662                if !dropped && !if_exists {
663                    return Err(Error::Query(QueryError::new(
664                        QueryErrorKind::Semantic,
665                        format!("Graph '{name}' does not exist"),
666                    )));
667                }
668                if dropped {
669                    #[cfg(feature = "wal")]
670                    self.log_schema_wal(
671                        &grafeo_adapters::storage::wal::WalRecord::DropNamedGraph {
672                            name: storage_key.clone(),
673                        },
674                    );
675                    // If this session was using the dropped graph, reset to default
676                    let mut current = self.current_graph.lock();
677                    if current
678                        .as_deref()
679                        .is_some_and(|g| g.eq_ignore_ascii_case(&name))
680                    {
681                        *current = None;
682                    }
683                }
684                Ok(QueryResult::empty())
685            }
686            SessionCommand::UseGraph(name) => {
687                // Verify graph exists (resolve within current schema)
688                let effective_key = self.effective_graph_key(&name);
689                if !name.eq_ignore_ascii_case("default")
690                    && self.store.graph(&effective_key).is_none()
691                {
692                    return Err(Error::Query(QueryError::new(
693                        QueryErrorKind::Semantic,
694                        format!("Graph '{name}' does not exist"),
695                    )));
696                }
697                self.use_graph(&name);
698                // Track the new graph if in a transaction
699                self.track_graph_touch();
700                Ok(QueryResult::empty())
701            }
702            SessionCommand::SessionSetGraph(name) => {
703                // ISO/IEC 39075 Section 7.1 GR2: set session graph (resolved within current schema)
704                let effective_key = self.effective_graph_key(&name);
705                if !name.eq_ignore_ascii_case("default")
706                    && self.store.graph(&effective_key).is_none()
707                {
708                    return Err(Error::Query(QueryError::new(
709                        QueryErrorKind::Semantic,
710                        format!("Graph '{name}' does not exist"),
711                    )));
712                }
713                self.use_graph(&name);
714                // Track the new graph if in a transaction
715                self.track_graph_touch();
716                Ok(QueryResult::empty())
717            }
718            SessionCommand::SessionSetSchema(name) => {
719                // ISO/IEC 39075 Section 7.1 GR1: set session schema (independent of graph)
720                if !self.catalog.schema_exists(&name) {
721                    return Err(Error::Query(QueryError::new(
722                        QueryErrorKind::Semantic,
723                        format!("Schema '{name}' does not exist"),
724                    )));
725                }
726                self.set_schema(&name);
727                Ok(QueryResult::empty())
728            }
729            SessionCommand::SessionSetTimeZone(tz) => {
730                self.set_time_zone(&tz);
731                Ok(QueryResult::empty())
732            }
733            SessionCommand::SessionSetParameter(key, expr) => {
734                if key.eq_ignore_ascii_case("viewing_epoch") {
735                    match Self::eval_integer_literal(&expr) {
736                        Some(n) if n >= 0 => {
737                            self.set_viewing_epoch(EpochId::new(n as u64));
738                            Ok(QueryResult::status(format!("Set viewing_epoch to {n}")))
739                        }
740                        _ => Err(Error::Query(QueryError::new(
741                            QueryErrorKind::Semantic,
742                            "viewing_epoch must be a non-negative integer literal",
743                        ))),
744                    }
745                } else {
746                    // For now, store parameter name with Null value.
747                    // Full expression evaluation would require building and executing a plan.
748                    self.set_parameter(&key, Value::Null);
749                    Ok(QueryResult::empty())
750                }
751            }
752            SessionCommand::SessionReset(target) => {
753                use grafeo_adapters::query::gql::ast::SessionResetTarget;
754                match target {
755                    SessionResetTarget::All => self.reset_session(),
756                    SessionResetTarget::Schema => self.reset_schema(),
757                    SessionResetTarget::Graph => self.reset_graph(),
758                    SessionResetTarget::TimeZone => self.reset_time_zone(),
759                    SessionResetTarget::Parameters => self.reset_parameters(),
760                }
761                Ok(QueryResult::empty())
762            }
763            SessionCommand::SessionClose => {
764                self.reset_session();
765                Ok(QueryResult::empty())
766            }
767            SessionCommand::StartTransaction {
768                read_only,
769                isolation_level,
770            } => {
771                let engine_level = isolation_level.map(|l| match l {
772                    TransactionIsolationLevel::ReadCommitted => {
773                        crate::transaction::IsolationLevel::ReadCommitted
774                    }
775                    TransactionIsolationLevel::SnapshotIsolation => {
776                        crate::transaction::IsolationLevel::SnapshotIsolation
777                    }
778                    TransactionIsolationLevel::Serializable => {
779                        crate::transaction::IsolationLevel::Serializable
780                    }
781                });
782                self.begin_transaction_inner(read_only, engine_level)?;
783                Ok(QueryResult::status("Transaction started"))
784            }
785            SessionCommand::Commit => {
786                self.commit_inner()?;
787                Ok(QueryResult::status("Transaction committed"))
788            }
789            SessionCommand::Rollback => {
790                self.rollback_inner()?;
791                Ok(QueryResult::status("Transaction rolled back"))
792            }
793            SessionCommand::Savepoint(name) => {
794                self.savepoint(&name)?;
795                Ok(QueryResult::status(format!("Savepoint '{name}' created")))
796            }
797            SessionCommand::RollbackToSavepoint(name) => {
798                self.rollback_to_savepoint(&name)?;
799                Ok(QueryResult::status(format!(
800                    "Rolled back to savepoint '{name}'"
801                )))
802            }
803            SessionCommand::ReleaseSavepoint(name) => {
804                self.release_savepoint(&name)?;
805                Ok(QueryResult::status(format!("Savepoint '{name}' released")))
806            }
807        }
808    }
809
810    /// Logs a WAL record for a schema change (no-op if WAL is not enabled).
811    #[cfg(feature = "wal")]
812    fn log_schema_wal(&self, record: &grafeo_adapters::storage::wal::WalRecord) {
813        if let Some(ref wal) = self.wal
814            && let Err(e) = wal.log(record)
815        {
816            tracing::warn!("Failed to log schema change to WAL: {}", e);
817        }
818    }
819
820    /// Executes a schema DDL command, returning a status result.
821    #[cfg(feature = "gql")]
822    fn execute_schema_command(
823        &self,
824        cmd: grafeo_adapters::query::gql::ast::SchemaStatement,
825    ) -> Result<QueryResult> {
826        use crate::catalog::{
827            EdgeTypeDefinition, NodeTypeDefinition, PropertyDataType, TypedProperty,
828        };
829        use grafeo_adapters::query::gql::ast::SchemaStatement;
830        #[cfg(feature = "wal")]
831        use grafeo_adapters::storage::wal::WalRecord;
832        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
833
834        /// Logs a WAL record for schema changes. Compiles to nothing without `wal`.
835        macro_rules! wal_log {
836            ($self:expr, $record:expr) => {
837                #[cfg(feature = "wal")]
838                $self.log_schema_wal(&$record);
839            };
840        }
841
842        let result = match cmd {
843            SchemaStatement::CreateNodeType(stmt) => {
844                #[cfg(feature = "wal")]
845                let props_for_wal: Vec<(String, String, bool)> = stmt
846                    .properties
847                    .iter()
848                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
849                    .collect();
850                let def = NodeTypeDefinition {
851                    name: stmt.name.clone(),
852                    properties: stmt
853                        .properties
854                        .iter()
855                        .map(|p| TypedProperty {
856                            name: p.name.clone(),
857                            data_type: PropertyDataType::from_type_name(&p.data_type),
858                            nullable: p.nullable,
859                            default_value: p
860                                .default_value
861                                .as_ref()
862                                .map(|s| parse_default_literal(s)),
863                        })
864                        .collect(),
865                    constraints: Vec::new(),
866                    parent_types: stmt.parent_types.clone(),
867                };
868                let result = if stmt.or_replace {
869                    let _ = self.catalog.drop_node_type(&stmt.name);
870                    self.catalog.register_node_type(def)
871                } else {
872                    self.catalog.register_node_type(def)
873                };
874                match result {
875                    Ok(()) => {
876                        wal_log!(
877                            self,
878                            WalRecord::CreateNodeType {
879                                name: stmt.name.clone(),
880                                properties: props_for_wal,
881                                constraints: Vec::new(),
882                            }
883                        );
884                        Ok(QueryResult::status(format!(
885                            "Created node type '{}'",
886                            stmt.name
887                        )))
888                    }
889                    Err(e) if stmt.if_not_exists => {
890                        let _ = e;
891                        Ok(QueryResult::status("No change"))
892                    }
893                    Err(e) => Err(Error::Query(QueryError::new(
894                        QueryErrorKind::Semantic,
895                        e.to_string(),
896                    ))),
897                }
898            }
899            SchemaStatement::CreateEdgeType(stmt) => {
900                #[cfg(feature = "wal")]
901                let props_for_wal: Vec<(String, String, bool)> = stmt
902                    .properties
903                    .iter()
904                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
905                    .collect();
906                let def = EdgeTypeDefinition {
907                    name: stmt.name.clone(),
908                    properties: stmt
909                        .properties
910                        .iter()
911                        .map(|p| TypedProperty {
912                            name: p.name.clone(),
913                            data_type: PropertyDataType::from_type_name(&p.data_type),
914                            nullable: p.nullable,
915                            default_value: p
916                                .default_value
917                                .as_ref()
918                                .map(|s| parse_default_literal(s)),
919                        })
920                        .collect(),
921                    constraints: Vec::new(),
922                    source_node_types: stmt.source_node_types.clone(),
923                    target_node_types: stmt.target_node_types.clone(),
924                };
925                let result = if stmt.or_replace {
926                    let _ = self.catalog.drop_edge_type_def(&stmt.name);
927                    self.catalog.register_edge_type_def(def)
928                } else {
929                    self.catalog.register_edge_type_def(def)
930                };
931                match result {
932                    Ok(()) => {
933                        wal_log!(
934                            self,
935                            WalRecord::CreateEdgeType {
936                                name: stmt.name.clone(),
937                                properties: props_for_wal,
938                                constraints: Vec::new(),
939                            }
940                        );
941                        Ok(QueryResult::status(format!(
942                            "Created edge type '{}'",
943                            stmt.name
944                        )))
945                    }
946                    Err(e) if stmt.if_not_exists => {
947                        let _ = e;
948                        Ok(QueryResult::status("No change"))
949                    }
950                    Err(e) => Err(Error::Query(QueryError::new(
951                        QueryErrorKind::Semantic,
952                        e.to_string(),
953                    ))),
954                }
955            }
956            SchemaStatement::CreateVectorIndex(stmt) => {
957                Self::create_vector_index_on_store(
958                    &self.active_lpg_store(),
959                    &stmt.node_label,
960                    &stmt.property,
961                    stmt.dimensions,
962                    stmt.metric.as_deref(),
963                )?;
964                wal_log!(
965                    self,
966                    WalRecord::CreateIndex {
967                        name: stmt.name.clone(),
968                        label: stmt.node_label.clone(),
969                        property: stmt.property.clone(),
970                        index_type: "vector".to_string(),
971                    }
972                );
973                Ok(QueryResult::status(format!(
974                    "Created vector index '{}'",
975                    stmt.name
976                )))
977            }
978            SchemaStatement::DropNodeType { name, if_exists } => {
979                match self.catalog.drop_node_type(&name) {
980                    Ok(()) => {
981                        wal_log!(self, WalRecord::DropNodeType { name: name.clone() });
982                        Ok(QueryResult::status(format!("Dropped node type '{name}'")))
983                    }
984                    Err(e) if if_exists => {
985                        let _ = e;
986                        Ok(QueryResult::status("No change"))
987                    }
988                    Err(e) => Err(Error::Query(QueryError::new(
989                        QueryErrorKind::Semantic,
990                        e.to_string(),
991                    ))),
992                }
993            }
994            SchemaStatement::DropEdgeType { name, if_exists } => {
995                match self.catalog.drop_edge_type_def(&name) {
996                    Ok(()) => {
997                        wal_log!(self, WalRecord::DropEdgeType { name: name.clone() });
998                        Ok(QueryResult::status(format!("Dropped edge type '{name}'")))
999                    }
1000                    Err(e) if if_exists => {
1001                        let _ = e;
1002                        Ok(QueryResult::status("No change"))
1003                    }
1004                    Err(e) => Err(Error::Query(QueryError::new(
1005                        QueryErrorKind::Semantic,
1006                        e.to_string(),
1007                    ))),
1008                }
1009            }
1010            SchemaStatement::CreateIndex(stmt) => {
1011                use grafeo_adapters::query::gql::ast::IndexKind;
1012                let active = self.active_lpg_store();
1013                let index_type_str = match stmt.index_kind {
1014                    IndexKind::Property => "property",
1015                    IndexKind::BTree => "btree",
1016                    IndexKind::Text => "text",
1017                    IndexKind::Vector => "vector",
1018                };
1019                match stmt.index_kind {
1020                    IndexKind::Property | IndexKind::BTree => {
1021                        for prop in &stmt.properties {
1022                            active.create_property_index(prop);
1023                        }
1024                    }
1025                    IndexKind::Text => {
1026                        for prop in &stmt.properties {
1027                            Self::create_text_index_on_store(&active, &stmt.label, prop)?;
1028                        }
1029                    }
1030                    IndexKind::Vector => {
1031                        for prop in &stmt.properties {
1032                            Self::create_vector_index_on_store(
1033                                &active,
1034                                &stmt.label,
1035                                prop,
1036                                stmt.options.dimensions,
1037                                stmt.options.metric.as_deref(),
1038                            )?;
1039                        }
1040                    }
1041                }
1042                #[cfg(feature = "wal")]
1043                for prop in &stmt.properties {
1044                    wal_log!(
1045                        self,
1046                        WalRecord::CreateIndex {
1047                            name: stmt.name.clone(),
1048                            label: stmt.label.clone(),
1049                            property: prop.clone(),
1050                            index_type: index_type_str.to_string(),
1051                        }
1052                    );
1053                }
1054                Ok(QueryResult::status(format!(
1055                    "Created {} index '{}'",
1056                    index_type_str, stmt.name
1057                )))
1058            }
1059            SchemaStatement::DropIndex { name, if_exists } => {
1060                // Try to drop property index by name
1061                let dropped = self.active_lpg_store().drop_property_index(&name);
1062                if dropped || if_exists {
1063                    if dropped {
1064                        wal_log!(self, WalRecord::DropIndex { name: name.clone() });
1065                    }
1066                    Ok(QueryResult::status(if dropped {
1067                        format!("Dropped index '{name}'")
1068                    } else {
1069                        "No change".to_string()
1070                    }))
1071                } else {
1072                    Err(Error::Query(QueryError::new(
1073                        QueryErrorKind::Semantic,
1074                        format!("Index '{name}' does not exist"),
1075                    )))
1076                }
1077            }
1078            SchemaStatement::CreateConstraint(stmt) => {
1079                use crate::catalog::TypeConstraint;
1080                use grafeo_adapters::query::gql::ast::ConstraintKind;
1081                let kind_str = match stmt.constraint_kind {
1082                    ConstraintKind::Unique => "unique",
1083                    ConstraintKind::NodeKey => "node_key",
1084                    ConstraintKind::NotNull => "not_null",
1085                    ConstraintKind::Exists => "exists",
1086                };
1087                let constraint_name = stmt
1088                    .name
1089                    .clone()
1090                    .unwrap_or_else(|| format!("{}_{kind_str}", stmt.label));
1091
1092                // Register constraint in catalog type definitions
1093                match stmt.constraint_kind {
1094                    ConstraintKind::Unique => {
1095                        for prop in &stmt.properties {
1096                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1097                            let prop_id = self.catalog.get_or_create_property_key(prop);
1098                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1099                        }
1100                        let _ = self.catalog.add_constraint_to_type(
1101                            &stmt.label,
1102                            TypeConstraint::Unique(stmt.properties.clone()),
1103                        );
1104                    }
1105                    ConstraintKind::NodeKey => {
1106                        for prop in &stmt.properties {
1107                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1108                            let prop_id = self.catalog.get_or_create_property_key(prop);
1109                            let _ = self.catalog.add_unique_constraint(label_id, prop_id);
1110                            let _ = self.catalog.add_required_property(label_id, prop_id);
1111                        }
1112                        let _ = self.catalog.add_constraint_to_type(
1113                            &stmt.label,
1114                            TypeConstraint::PrimaryKey(stmt.properties.clone()),
1115                        );
1116                    }
1117                    ConstraintKind::NotNull | ConstraintKind::Exists => {
1118                        for prop in &stmt.properties {
1119                            let label_id = self.catalog.get_or_create_label(&stmt.label);
1120                            let prop_id = self.catalog.get_or_create_property_key(prop);
1121                            let _ = self.catalog.add_required_property(label_id, prop_id);
1122                            let _ = self.catalog.add_constraint_to_type(
1123                                &stmt.label,
1124                                TypeConstraint::NotNull(prop.clone()),
1125                            );
1126                        }
1127                    }
1128                }
1129
1130                wal_log!(
1131                    self,
1132                    WalRecord::CreateConstraint {
1133                        name: constraint_name.clone(),
1134                        label: stmt.label.clone(),
1135                        properties: stmt.properties.clone(),
1136                        kind: kind_str.to_string(),
1137                    }
1138                );
1139                Ok(QueryResult::status(format!(
1140                    "Created {kind_str} constraint '{constraint_name}'"
1141                )))
1142            }
1143            SchemaStatement::DropConstraint { name, if_exists } => {
1144                let _ = if_exists;
1145                wal_log!(self, WalRecord::DropConstraint { name: name.clone() });
1146                Ok(QueryResult::status(format!("Dropped constraint '{name}'")))
1147            }
1148            SchemaStatement::CreateGraphType(stmt) => {
1149                use crate::catalog::GraphTypeDefinition;
1150                use grafeo_adapters::query::gql::ast::InlineElementType;
1151
1152                // GG04: LIKE clause copies type from existing graph
1153                let (mut node_types, mut edge_types, open) =
1154                    if let Some(ref like_graph) = stmt.like_graph {
1155                        // Infer types from the graph's bound type, or use its existing types
1156                        if let Some(type_name) = self.catalog.get_graph_type_binding(like_graph) {
1157                            if let Some(existing) = self
1158                                .catalog
1159                                .schema()
1160                                .and_then(|s| s.get_graph_type(&type_name))
1161                            {
1162                                (
1163                                    existing.allowed_node_types.clone(),
1164                                    existing.allowed_edge_types.clone(),
1165                                    existing.open,
1166                                )
1167                            } else {
1168                                (Vec::new(), Vec::new(), true)
1169                            }
1170                        } else {
1171                            // GG22: Infer from graph data (labels used in graph)
1172                            let nt = self.catalog.all_node_type_names();
1173                            let et = self.catalog.all_edge_type_names();
1174                            if nt.is_empty() && et.is_empty() {
1175                                (Vec::new(), Vec::new(), true)
1176                            } else {
1177                                (nt, et, false)
1178                            }
1179                        }
1180                    } else {
1181                        (stmt.node_types.clone(), stmt.edge_types.clone(), stmt.open)
1182                    };
1183
1184                // GG03: Register inline element types and add their names
1185                for inline in &stmt.inline_types {
1186                    match inline {
1187                        InlineElementType::Node {
1188                            name,
1189                            properties,
1190                            key_labels,
1191                            ..
1192                        } => {
1193                            let def = NodeTypeDefinition {
1194                                name: name.clone(),
1195                                properties: properties
1196                                    .iter()
1197                                    .map(|p| TypedProperty {
1198                                        name: p.name.clone(),
1199                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1200                                        nullable: p.nullable,
1201                                        default_value: None,
1202                                    })
1203                                    .collect(),
1204                                constraints: Vec::new(),
1205                                parent_types: key_labels.clone(),
1206                            };
1207                            // Register or replace so inline defs override existing
1208                            self.catalog.register_or_replace_node_type(def);
1209                            #[cfg(feature = "wal")]
1210                            {
1211                                let props_for_wal: Vec<(String, String, bool)> = properties
1212                                    .iter()
1213                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1214                                    .collect();
1215                                self.log_schema_wal(&WalRecord::CreateNodeType {
1216                                    name: name.clone(),
1217                                    properties: props_for_wal,
1218                                    constraints: Vec::new(),
1219                                });
1220                            }
1221                            if !node_types.contains(name) {
1222                                node_types.push(name.clone());
1223                            }
1224                        }
1225                        InlineElementType::Edge {
1226                            name,
1227                            properties,
1228                            source_node_types,
1229                            target_node_types,
1230                            ..
1231                        } => {
1232                            let def = EdgeTypeDefinition {
1233                                name: name.clone(),
1234                                properties: properties
1235                                    .iter()
1236                                    .map(|p| TypedProperty {
1237                                        name: p.name.clone(),
1238                                        data_type: PropertyDataType::from_type_name(&p.data_type),
1239                                        nullable: p.nullable,
1240                                        default_value: None,
1241                                    })
1242                                    .collect(),
1243                                constraints: Vec::new(),
1244                                source_node_types: source_node_types.clone(),
1245                                target_node_types: target_node_types.clone(),
1246                            };
1247                            self.catalog.register_or_replace_edge_type_def(def);
1248                            #[cfg(feature = "wal")]
1249                            {
1250                                let props_for_wal: Vec<(String, String, bool)> = properties
1251                                    .iter()
1252                                    .map(|p| (p.name.clone(), p.data_type.clone(), p.nullable))
1253                                    .collect();
1254                                self.log_schema_wal(&WalRecord::CreateEdgeType {
1255                                    name: name.clone(),
1256                                    properties: props_for_wal,
1257                                    constraints: Vec::new(),
1258                                });
1259                            }
1260                            if !edge_types.contains(name) {
1261                                edge_types.push(name.clone());
1262                            }
1263                        }
1264                    }
1265                }
1266
1267                let def = GraphTypeDefinition {
1268                    name: stmt.name.clone(),
1269                    allowed_node_types: node_types.clone(),
1270                    allowed_edge_types: edge_types.clone(),
1271                    open,
1272                };
1273                let result = if stmt.or_replace {
1274                    // Drop existing first, ignore error if not found
1275                    let _ = self.catalog.drop_graph_type(&stmt.name);
1276                    self.catalog.register_graph_type(def)
1277                } else {
1278                    self.catalog.register_graph_type(def)
1279                };
1280                match result {
1281                    Ok(()) => {
1282                        wal_log!(
1283                            self,
1284                            WalRecord::CreateGraphType {
1285                                name: stmt.name.clone(),
1286                                node_types,
1287                                edge_types,
1288                                open,
1289                            }
1290                        );
1291                        Ok(QueryResult::status(format!(
1292                            "Created graph type '{}'",
1293                            stmt.name
1294                        )))
1295                    }
1296                    Err(e) if stmt.if_not_exists => {
1297                        let _ = e;
1298                        Ok(QueryResult::status("No change"))
1299                    }
1300                    Err(e) => Err(Error::Query(QueryError::new(
1301                        QueryErrorKind::Semantic,
1302                        e.to_string(),
1303                    ))),
1304                }
1305            }
1306            SchemaStatement::DropGraphType { name, if_exists } => {
1307                match self.catalog.drop_graph_type(&name) {
1308                    Ok(()) => {
1309                        wal_log!(self, WalRecord::DropGraphType { name: name.clone() });
1310                        Ok(QueryResult::status(format!("Dropped graph type '{name}'")))
1311                    }
1312                    Err(e) if if_exists => {
1313                        let _ = e;
1314                        Ok(QueryResult::status("No change"))
1315                    }
1316                    Err(e) => Err(Error::Query(QueryError::new(
1317                        QueryErrorKind::Semantic,
1318                        e.to_string(),
1319                    ))),
1320                }
1321            }
1322            SchemaStatement::CreateSchema {
1323                name,
1324                if_not_exists,
1325            } => match self.catalog.register_schema_namespace(name.clone()) {
1326                Ok(()) => {
1327                    wal_log!(self, WalRecord::CreateSchema { name: name.clone() });
1328                    Ok(QueryResult::status(format!("Created schema '{name}'")))
1329                }
1330                Err(e) if if_not_exists => {
1331                    let _ = e;
1332                    Ok(QueryResult::status("No change"))
1333                }
1334                Err(e) => Err(Error::Query(QueryError::new(
1335                    QueryErrorKind::Semantic,
1336                    e.to_string(),
1337                ))),
1338            },
1339            SchemaStatement::DropSchema { name, if_exists } => {
1340                // ISO/IEC 39075 Section 12.3: schema must be empty before dropping
1341                let prefix = format!("{name}/");
1342                let has_graphs = self
1343                    .store
1344                    .graph_names()
1345                    .iter()
1346                    .any(|g| g.starts_with(&prefix));
1347                if has_graphs {
1348                    return Err(Error::Query(QueryError::new(
1349                        QueryErrorKind::Semantic,
1350                        format!(
1351                            "Schema '{name}' is not empty: drop all graphs in the schema first"
1352                        ),
1353                    )));
1354                }
1355                match self.catalog.drop_schema_namespace(&name) {
1356                    Ok(()) => {
1357                        wal_log!(self, WalRecord::DropSchema { name: name.clone() });
1358                        // If this session was using the dropped schema, reset it
1359                        let mut current = self.current_schema.lock();
1360                        if current
1361                            .as_deref()
1362                            .is_some_and(|s| s.eq_ignore_ascii_case(&name))
1363                        {
1364                            *current = None;
1365                        }
1366                        Ok(QueryResult::status(format!("Dropped schema '{name}'")))
1367                    }
1368                    Err(e) if if_exists => {
1369                        let _ = e;
1370                        Ok(QueryResult::status("No change"))
1371                    }
1372                    Err(e) => Err(Error::Query(QueryError::new(
1373                        QueryErrorKind::Semantic,
1374                        e.to_string(),
1375                    ))),
1376                }
1377            }
1378            SchemaStatement::AlterNodeType(stmt) => {
1379                use grafeo_adapters::query::gql::ast::TypeAlteration;
1380                let mut wal_alts = Vec::new();
1381                for alt in &stmt.alterations {
1382                    match alt {
1383                        TypeAlteration::AddProperty(prop) => {
1384                            let typed = TypedProperty {
1385                                name: prop.name.clone(),
1386                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1387                                nullable: prop.nullable,
1388                                default_value: prop
1389                                    .default_value
1390                                    .as_ref()
1391                                    .map(|s| parse_default_literal(s)),
1392                            };
1393                            self.catalog
1394                                .alter_node_type_add_property(&stmt.name, typed)
1395                                .map_err(|e| {
1396                                    Error::Query(QueryError::new(
1397                                        QueryErrorKind::Semantic,
1398                                        e.to_string(),
1399                                    ))
1400                                })?;
1401                            wal_alts.push((
1402                                "add".to_string(),
1403                                prop.name.clone(),
1404                                prop.data_type.clone(),
1405                                prop.nullable,
1406                            ));
1407                        }
1408                        TypeAlteration::DropProperty(name) => {
1409                            self.catalog
1410                                .alter_node_type_drop_property(&stmt.name, name)
1411                                .map_err(|e| {
1412                                    Error::Query(QueryError::new(
1413                                        QueryErrorKind::Semantic,
1414                                        e.to_string(),
1415                                    ))
1416                                })?;
1417                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1418                        }
1419                    }
1420                }
1421                wal_log!(
1422                    self,
1423                    WalRecord::AlterNodeType {
1424                        name: stmt.name.clone(),
1425                        alterations: wal_alts,
1426                    }
1427                );
1428                Ok(QueryResult::status(format!(
1429                    "Altered node type '{}'",
1430                    stmt.name
1431                )))
1432            }
1433            SchemaStatement::AlterEdgeType(stmt) => {
1434                use grafeo_adapters::query::gql::ast::TypeAlteration;
1435                let mut wal_alts = Vec::new();
1436                for alt in &stmt.alterations {
1437                    match alt {
1438                        TypeAlteration::AddProperty(prop) => {
1439                            let typed = TypedProperty {
1440                                name: prop.name.clone(),
1441                                data_type: PropertyDataType::from_type_name(&prop.data_type),
1442                                nullable: prop.nullable,
1443                                default_value: prop
1444                                    .default_value
1445                                    .as_ref()
1446                                    .map(|s| parse_default_literal(s)),
1447                            };
1448                            self.catalog
1449                                .alter_edge_type_add_property(&stmt.name, typed)
1450                                .map_err(|e| {
1451                                    Error::Query(QueryError::new(
1452                                        QueryErrorKind::Semantic,
1453                                        e.to_string(),
1454                                    ))
1455                                })?;
1456                            wal_alts.push((
1457                                "add".to_string(),
1458                                prop.name.clone(),
1459                                prop.data_type.clone(),
1460                                prop.nullable,
1461                            ));
1462                        }
1463                        TypeAlteration::DropProperty(name) => {
1464                            self.catalog
1465                                .alter_edge_type_drop_property(&stmt.name, name)
1466                                .map_err(|e| {
1467                                    Error::Query(QueryError::new(
1468                                        QueryErrorKind::Semantic,
1469                                        e.to_string(),
1470                                    ))
1471                                })?;
1472                            wal_alts.push(("drop".to_string(), name.clone(), String::new(), false));
1473                        }
1474                    }
1475                }
1476                wal_log!(
1477                    self,
1478                    WalRecord::AlterEdgeType {
1479                        name: stmt.name.clone(),
1480                        alterations: wal_alts,
1481                    }
1482                );
1483                Ok(QueryResult::status(format!(
1484                    "Altered edge type '{}'",
1485                    stmt.name
1486                )))
1487            }
1488            SchemaStatement::AlterGraphType(stmt) => {
1489                use grafeo_adapters::query::gql::ast::GraphTypeAlteration;
1490                let mut wal_alts = Vec::new();
1491                for alt in &stmt.alterations {
1492                    match alt {
1493                        GraphTypeAlteration::AddNodeType(name) => {
1494                            self.catalog
1495                                .alter_graph_type_add_node_type(&stmt.name, name.clone())
1496                                .map_err(|e| {
1497                                    Error::Query(QueryError::new(
1498                                        QueryErrorKind::Semantic,
1499                                        e.to_string(),
1500                                    ))
1501                                })?;
1502                            wal_alts.push(("add_node_type".to_string(), name.clone()));
1503                        }
1504                        GraphTypeAlteration::DropNodeType(name) => {
1505                            self.catalog
1506                                .alter_graph_type_drop_node_type(&stmt.name, name)
1507                                .map_err(|e| {
1508                                    Error::Query(QueryError::new(
1509                                        QueryErrorKind::Semantic,
1510                                        e.to_string(),
1511                                    ))
1512                                })?;
1513                            wal_alts.push(("drop_node_type".to_string(), name.clone()));
1514                        }
1515                        GraphTypeAlteration::AddEdgeType(name) => {
1516                            self.catalog
1517                                .alter_graph_type_add_edge_type(&stmt.name, name.clone())
1518                                .map_err(|e| {
1519                                    Error::Query(QueryError::new(
1520                                        QueryErrorKind::Semantic,
1521                                        e.to_string(),
1522                                    ))
1523                                })?;
1524                            wal_alts.push(("add_edge_type".to_string(), name.clone()));
1525                        }
1526                        GraphTypeAlteration::DropEdgeType(name) => {
1527                            self.catalog
1528                                .alter_graph_type_drop_edge_type(&stmt.name, name)
1529                                .map_err(|e| {
1530                                    Error::Query(QueryError::new(
1531                                        QueryErrorKind::Semantic,
1532                                        e.to_string(),
1533                                    ))
1534                                })?;
1535                            wal_alts.push(("drop_edge_type".to_string(), name.clone()));
1536                        }
1537                    }
1538                }
1539                wal_log!(
1540                    self,
1541                    WalRecord::AlterGraphType {
1542                        name: stmt.name.clone(),
1543                        alterations: wal_alts,
1544                    }
1545                );
1546                Ok(QueryResult::status(format!(
1547                    "Altered graph type '{}'",
1548                    stmt.name
1549                )))
1550            }
1551            SchemaStatement::CreateProcedure(stmt) => {
1552                use crate::catalog::ProcedureDefinition;
1553
1554                let def = ProcedureDefinition {
1555                    name: stmt.name.clone(),
1556                    params: stmt
1557                        .params
1558                        .iter()
1559                        .map(|p| (p.name.clone(), p.param_type.clone()))
1560                        .collect(),
1561                    returns: stmt
1562                        .returns
1563                        .iter()
1564                        .map(|r| (r.name.clone(), r.return_type.clone()))
1565                        .collect(),
1566                    body: stmt.body.clone(),
1567                };
1568
1569                if stmt.or_replace {
1570                    self.catalog.replace_procedure(def).map_err(|e| {
1571                        Error::Query(QueryError::new(QueryErrorKind::Semantic, e.to_string()))
1572                    })?;
1573                } else {
1574                    match self.catalog.register_procedure(def) {
1575                        Ok(()) => {}
1576                        Err(_) if stmt.if_not_exists => {
1577                            return Ok(QueryResult::empty());
1578                        }
1579                        Err(e) => {
1580                            return Err(Error::Query(QueryError::new(
1581                                QueryErrorKind::Semantic,
1582                                e.to_string(),
1583                            )));
1584                        }
1585                    }
1586                }
1587
1588                wal_log!(
1589                    self,
1590                    WalRecord::CreateProcedure {
1591                        name: stmt.name.clone(),
1592                        params: stmt
1593                            .params
1594                            .iter()
1595                            .map(|p| (p.name.clone(), p.param_type.clone()))
1596                            .collect(),
1597                        returns: stmt
1598                            .returns
1599                            .iter()
1600                            .map(|r| (r.name.clone(), r.return_type.clone()))
1601                            .collect(),
1602                        body: stmt.body,
1603                    }
1604                );
1605                Ok(QueryResult::status(format!(
1606                    "Created procedure '{}'",
1607                    stmt.name
1608                )))
1609            }
1610            SchemaStatement::DropProcedure { name, if_exists } => {
1611                match self.catalog.drop_procedure(&name) {
1612                    Ok(()) => {}
1613                    Err(_) if if_exists => {
1614                        return Ok(QueryResult::empty());
1615                    }
1616                    Err(e) => {
1617                        return Err(Error::Query(QueryError::new(
1618                            QueryErrorKind::Semantic,
1619                            e.to_string(),
1620                        )));
1621                    }
1622                }
1623                wal_log!(self, WalRecord::DropProcedure { name: name.clone() });
1624                Ok(QueryResult::status(format!("Dropped procedure '{name}'")))
1625            }
1626            SchemaStatement::ShowIndexes => {
1627                return self.execute_show_indexes();
1628            }
1629            SchemaStatement::ShowConstraints => {
1630                return self.execute_show_constraints();
1631            }
1632            SchemaStatement::ShowNodeTypes => {
1633                return self.execute_show_node_types();
1634            }
1635            SchemaStatement::ShowEdgeTypes => {
1636                return self.execute_show_edge_types();
1637            }
1638            SchemaStatement::ShowGraphTypes => {
1639                return self.execute_show_graph_types();
1640            }
1641            SchemaStatement::ShowGraphType(name) => {
1642                return self.execute_show_graph_type(&name);
1643            }
1644            SchemaStatement::ShowCurrentGraphType => {
1645                return self.execute_show_current_graph_type();
1646            }
1647            SchemaStatement::ShowGraphs => {
1648                return self.execute_show_graphs();
1649            }
1650            SchemaStatement::ShowSchemas => {
1651                return self.execute_show_schemas();
1652            }
1653        };
1654
1655        // Invalidate all cached query plans after any successful DDL change.
1656        // DDL is rare, so clearing the entire cache is cheap and correct.
1657        if result.is_ok() {
1658            self.query_cache.clear();
1659        }
1660
1661        result
1662    }
1663
1664    /// Creates a vector index on the store by scanning existing nodes.
1665    #[cfg(all(feature = "gql", feature = "vector-index"))]
1666    fn create_vector_index_on_store(
1667        store: &LpgStore,
1668        label: &str,
1669        property: &str,
1670        dimensions: Option<usize>,
1671        metric: Option<&str>,
1672    ) -> Result<()> {
1673        use grafeo_common::types::{PropertyKey, Value};
1674        use grafeo_common::utils::error::Error;
1675        use grafeo_core::index::vector::{DistanceMetric, HnswConfig, HnswIndex};
1676
1677        let metric = match metric {
1678            Some(m) => DistanceMetric::from_str(m).ok_or_else(|| {
1679                Error::Internal(format!(
1680                    "Unknown distance metric '{m}'. Use: cosine, euclidean, dot_product, manhattan"
1681                ))
1682            })?,
1683            None => DistanceMetric::Cosine,
1684        };
1685
1686        let prop_key = PropertyKey::new(property);
1687        let mut found_dims: Option<usize> = dimensions;
1688        let mut vectors: Vec<(grafeo_common::types::NodeId, Vec<f32>)> = Vec::new();
1689
1690        for node in store.nodes_with_label(label) {
1691            if let Some(Value::Vector(v)) = node.properties.get(&prop_key) {
1692                if let Some(expected) = found_dims {
1693                    if v.len() != expected {
1694                        return Err(Error::Internal(format!(
1695                            "Vector dimension mismatch: expected {expected}, found {} on node {}",
1696                            v.len(),
1697                            node.id.0
1698                        )));
1699                    }
1700                } else {
1701                    found_dims = Some(v.len());
1702                }
1703                vectors.push((node.id, v.to_vec()));
1704            }
1705        }
1706
1707        let Some(dims) = found_dims else {
1708            return Err(Error::Internal(format!(
1709                "No vector properties found on :{label}({property}) and no dimensions specified"
1710            )));
1711        };
1712
1713        let config = HnswConfig::new(dims, metric);
1714        let index = HnswIndex::with_capacity(config, vectors.len());
1715        let accessor = grafeo_core::index::vector::PropertyVectorAccessor::new(store, property);
1716        for (node_id, vec) in &vectors {
1717            index.insert(*node_id, vec, &accessor);
1718        }
1719
1720        store.add_vector_index(label, property, Arc::new(index));
1721        Ok(())
1722    }
1723
1724    /// Stub for when vector-index feature is not enabled.
1725    #[cfg(all(feature = "gql", not(feature = "vector-index")))]
1726    fn create_vector_index_on_store(
1727        _store: &LpgStore,
1728        _label: &str,
1729        _property: &str,
1730        _dimensions: Option<usize>,
1731        _metric: Option<&str>,
1732    ) -> Result<()> {
1733        Err(grafeo_common::utils::error::Error::Internal(
1734            "Vector index support requires the 'vector-index' feature".to_string(),
1735        ))
1736    }
1737
1738    /// Creates a text index on the store by scanning existing nodes.
1739    #[cfg(all(feature = "gql", feature = "text-index"))]
1740    fn create_text_index_on_store(store: &LpgStore, label: &str, property: &str) -> Result<()> {
1741        use grafeo_common::types::{PropertyKey, Value};
1742        use grafeo_core::index::text::{BM25Config, InvertedIndex};
1743
1744        let mut index = InvertedIndex::new(BM25Config::default());
1745        let prop_key = PropertyKey::new(property);
1746
1747        let nodes = store.nodes_by_label(label);
1748        for node_id in nodes {
1749            if let Some(Value::String(text)) = store.get_node_property(node_id, &prop_key) {
1750                index.insert(node_id, text.as_str());
1751            }
1752        }
1753
1754        store.add_text_index(label, property, Arc::new(parking_lot::RwLock::new(index)));
1755        Ok(())
1756    }
1757
1758    /// Stub for when text-index feature is not enabled.
1759    #[cfg(all(feature = "gql", not(feature = "text-index")))]
1760    fn create_text_index_on_store(_store: &LpgStore, _label: &str, _property: &str) -> Result<()> {
1761        Err(grafeo_common::utils::error::Error::Internal(
1762            "Text index support requires the 'text-index' feature".to_string(),
1763        ))
1764    }
1765
1766    /// Returns a table of all indexes from the catalog.
1767    fn execute_show_indexes(&self) -> Result<QueryResult> {
1768        let indexes = self.catalog.all_indexes();
1769        let columns = vec![
1770            "name".to_string(),
1771            "type".to_string(),
1772            "label".to_string(),
1773            "property".to_string(),
1774        ];
1775        let rows: Vec<Vec<Value>> = indexes
1776            .into_iter()
1777            .map(|def| {
1778                let label_name = self
1779                    .catalog
1780                    .get_label_name(def.label)
1781                    .unwrap_or_else(|| "?".into());
1782                let prop_name = self
1783                    .catalog
1784                    .get_property_key_name(def.property_key)
1785                    .unwrap_or_else(|| "?".into());
1786                vec![
1787                    Value::from(format!("idx_{}_{}", label_name, prop_name)),
1788                    Value::from(format!("{:?}", def.index_type)),
1789                    Value::from(&*label_name),
1790                    Value::from(&*prop_name),
1791                ]
1792            })
1793            .collect();
1794        Ok(QueryResult {
1795            columns,
1796            column_types: Vec::new(),
1797            rows,
1798            ..QueryResult::empty()
1799        })
1800    }
1801
1802    /// Returns a table of all constraints (currently metadata-only).
1803    fn execute_show_constraints(&self) -> Result<QueryResult> {
1804        // Constraints are tracked in WAL but not yet in a queryable catalog.
1805        // Return an empty table with the expected schema.
1806        Ok(QueryResult {
1807            columns: vec![
1808                "name".to_string(),
1809                "type".to_string(),
1810                "label".to_string(),
1811                "properties".to_string(),
1812            ],
1813            column_types: Vec::new(),
1814            rows: Vec::new(),
1815            ..QueryResult::empty()
1816        })
1817    }
1818
1819    /// Returns a table of all registered node types.
1820    fn execute_show_node_types(&self) -> Result<QueryResult> {
1821        let columns = vec![
1822            "name".to_string(),
1823            "properties".to_string(),
1824            "constraints".to_string(),
1825            "parents".to_string(),
1826        ];
1827        let type_names = self.catalog.all_node_type_names();
1828        let rows: Vec<Vec<Value>> = type_names
1829            .into_iter()
1830            .filter_map(|name| {
1831                let def = self.catalog.get_node_type(&name)?;
1832                let props: Vec<String> = def
1833                    .properties
1834                    .iter()
1835                    .map(|p| {
1836                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1837                        format!("{} {}{}", p.name, p.data_type, nullable)
1838                    })
1839                    .collect();
1840                let constraints: Vec<String> =
1841                    def.constraints.iter().map(|c| format!("{c:?}")).collect();
1842                let parents = def.parent_types.join(", ");
1843                Some(vec![
1844                    Value::from(name),
1845                    Value::from(props.join(", ")),
1846                    Value::from(constraints.join(", ")),
1847                    Value::from(parents),
1848                ])
1849            })
1850            .collect();
1851        Ok(QueryResult {
1852            columns,
1853            column_types: Vec::new(),
1854            rows,
1855            ..QueryResult::empty()
1856        })
1857    }
1858
1859    /// Returns a table of all registered edge types.
1860    fn execute_show_edge_types(&self) -> Result<QueryResult> {
1861        let columns = vec![
1862            "name".to_string(),
1863            "properties".to_string(),
1864            "source_types".to_string(),
1865            "target_types".to_string(),
1866        ];
1867        let type_names = self.catalog.all_edge_type_names();
1868        let rows: Vec<Vec<Value>> = type_names
1869            .into_iter()
1870            .filter_map(|name| {
1871                let def = self.catalog.get_edge_type_def(&name)?;
1872                let props: Vec<String> = def
1873                    .properties
1874                    .iter()
1875                    .map(|p| {
1876                        let nullable = if p.nullable { "" } else { " NOT NULL" };
1877                        format!("{} {}{}", p.name, p.data_type, nullable)
1878                    })
1879                    .collect();
1880                let src = def.source_node_types.join(", ");
1881                let tgt = def.target_node_types.join(", ");
1882                Some(vec![
1883                    Value::from(name),
1884                    Value::from(props.join(", ")),
1885                    Value::from(src),
1886                    Value::from(tgt),
1887                ])
1888            })
1889            .collect();
1890        Ok(QueryResult {
1891            columns,
1892            column_types: Vec::new(),
1893            rows,
1894            ..QueryResult::empty()
1895        })
1896    }
1897
1898    /// Returns a table of all registered graph types.
1899    fn execute_show_graph_types(&self) -> Result<QueryResult> {
1900        let columns = vec![
1901            "name".to_string(),
1902            "open".to_string(),
1903            "node_types".to_string(),
1904            "edge_types".to_string(),
1905        ];
1906        let type_names = self.catalog.all_graph_type_names();
1907        let rows: Vec<Vec<Value>> = type_names
1908            .into_iter()
1909            .filter_map(|name| {
1910                let def = self.catalog.get_graph_type_def(&name)?;
1911                Some(vec![
1912                    Value::from(name),
1913                    Value::from(def.open),
1914                    Value::from(def.allowed_node_types.join(", ")),
1915                    Value::from(def.allowed_edge_types.join(", ")),
1916                ])
1917            })
1918            .collect();
1919        Ok(QueryResult {
1920            columns,
1921            column_types: Vec::new(),
1922            rows,
1923            ..QueryResult::empty()
1924        })
1925    }
1926
1927    /// Returns the list of named graphs visible in the current schema context.
1928    ///
1929    /// When a session schema is set, only graphs belonging to that schema are
1930    /// shown (their compound prefix is stripped). When no schema is set, graphs
1931    /// without a schema prefix are shown (the default schema).
1932    fn execute_show_graphs(&self) -> Result<QueryResult> {
1933        let schema = self.current_schema.lock().clone();
1934        let all_names = self.store.graph_names();
1935
1936        let mut names: Vec<String> = match &schema {
1937            Some(s) => {
1938                let prefix = format!("{s}/");
1939                all_names
1940                    .into_iter()
1941                    .filter_map(|n| n.strip_prefix(&prefix).map(String::from))
1942                    .collect()
1943            }
1944            None => all_names.into_iter().filter(|n| !n.contains('/')).collect(),
1945        };
1946        names.sort();
1947
1948        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1949        Ok(QueryResult {
1950            columns: vec!["name".to_string()],
1951            column_types: Vec::new(),
1952            rows,
1953            ..QueryResult::empty()
1954        })
1955    }
1956
1957    /// Returns the list of all schema namespaces.
1958    fn execute_show_schemas(&self) -> Result<QueryResult> {
1959        let mut names = self.catalog.schema_names();
1960        names.sort();
1961        let rows: Vec<Vec<Value>> = names.into_iter().map(|n| vec![Value::from(n)]).collect();
1962        Ok(QueryResult {
1963            columns: vec!["name".to_string()],
1964            column_types: Vec::new(),
1965            rows,
1966            ..QueryResult::empty()
1967        })
1968    }
1969
1970    /// Returns detailed info for a specific graph type.
1971    fn execute_show_graph_type(&self, name: &str) -> Result<QueryResult> {
1972        use grafeo_common::utils::error::{Error, QueryError, QueryErrorKind};
1973
1974        let def = self.catalog.get_graph_type_def(name).ok_or_else(|| {
1975            Error::Query(QueryError::new(
1976                QueryErrorKind::Semantic,
1977                format!("Graph type '{name}' not found"),
1978            ))
1979        })?;
1980
1981        let columns = vec![
1982            "name".to_string(),
1983            "open".to_string(),
1984            "node_types".to_string(),
1985            "edge_types".to_string(),
1986        ];
1987        let rows = vec![vec![
1988            Value::from(def.name),
1989            Value::from(def.open),
1990            Value::from(def.allowed_node_types.join(", ")),
1991            Value::from(def.allowed_edge_types.join(", ")),
1992        ]];
1993        Ok(QueryResult {
1994            columns,
1995            column_types: Vec::new(),
1996            rows,
1997            ..QueryResult::empty()
1998        })
1999    }
2000
2001    /// Returns the graph type bound to the current graph.
2002    fn execute_show_current_graph_type(&self) -> Result<QueryResult> {
2003        let graph_name = self
2004            .current_graph()
2005            .unwrap_or_else(|| "default".to_string());
2006        let columns = vec![
2007            "graph".to_string(),
2008            "graph_type".to_string(),
2009            "open".to_string(),
2010            "node_types".to_string(),
2011            "edge_types".to_string(),
2012        ];
2013
2014        if let Some(type_name) = self.catalog.get_graph_type_binding(&graph_name)
2015            && let Some(def) = self.catalog.get_graph_type_def(&type_name)
2016        {
2017            let rows = vec![vec![
2018                Value::from(graph_name),
2019                Value::from(type_name),
2020                Value::from(def.open),
2021                Value::from(def.allowed_node_types.join(", ")),
2022                Value::from(def.allowed_edge_types.join(", ")),
2023            ]];
2024            return Ok(QueryResult {
2025                columns,
2026                column_types: Vec::new(),
2027                rows,
2028                ..QueryResult::empty()
2029            });
2030        }
2031
2032        // No graph type binding found
2033        Ok(QueryResult {
2034            columns,
2035            column_types: Vec::new(),
2036            rows: vec![vec![
2037                Value::from(graph_name),
2038                Value::Null,
2039                Value::Null,
2040                Value::Null,
2041                Value::Null,
2042            ]],
2043            ..QueryResult::empty()
2044        })
2045    }
2046
2047    /// Executes a GQL query.
2048    ///
2049    /// # Errors
2050    ///
2051    /// Returns an error if the query fails to parse or execute.
2052    ///
2053    /// # Examples
2054    ///
2055    /// ```no_run
2056    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2057    /// use grafeo_engine::GrafeoDB;
2058    ///
2059    /// let db = GrafeoDB::new_in_memory();
2060    /// let session = db.session();
2061    ///
2062    /// // Create a node
2063    /// session.execute("INSERT (:Person {name: 'Alix', age: 30})")?;
2064    ///
2065    /// // Query nodes
2066    /// let result = session.execute("MATCH (n:Person) RETURN n.name, n.age")?;
2067    /// for row in &result.rows {
2068    ///     println!("{:?}", row);
2069    /// }
2070    /// # Ok(())
2071    /// # }
2072    /// ```
2073    #[cfg(feature = "gql")]
2074    pub fn execute(&self, query: &str) -> Result<QueryResult> {
2075        self.require_lpg("GQL")?;
2076
2077        use crate::query::{
2078            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2079            processor::QueryLanguage, translators::gql,
2080        };
2081
2082        #[cfg(not(target_arch = "wasm32"))]
2083        let start_time = std::time::Instant::now();
2084
2085        // Parse and translate, checking for session/schema commands first
2086        let translation = gql::translate_full(query)?;
2087        let logical_plan = match translation {
2088            gql::GqlTranslationResult::SessionCommand(cmd) => {
2089                return self.execute_session_command(cmd);
2090            }
2091            gql::GqlTranslationResult::SchemaCommand(cmd) => {
2092                // All DDL is a write operation
2093                if *self.read_only_tx.lock() {
2094                    return Err(grafeo_common::utils::error::Error::Transaction(
2095                        grafeo_common::utils::error::TransactionError::ReadOnly,
2096                    ));
2097                }
2098                return self.execute_schema_command(cmd);
2099            }
2100            gql::GqlTranslationResult::Plan(plan) => {
2101                // Block mutations in read-only transactions
2102                if *self.read_only_tx.lock() && plan.root.has_mutations() {
2103                    return Err(grafeo_common::utils::error::Error::Transaction(
2104                        grafeo_common::utils::error::TransactionError::ReadOnly,
2105                    ));
2106                }
2107                plan
2108            }
2109        };
2110
2111        // Create cache key for this query
2112        let cache_key = CacheKey::with_graph(query, QueryLanguage::Gql, self.current_graph());
2113
2114        // Try to get cached optimized plan, or use the plan we just translated
2115        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2116            cached_plan
2117        } else {
2118            // Semantic validation
2119            let mut binder = Binder::new();
2120            let _binding_context = binder.bind(&logical_plan)?;
2121
2122            // Optimize the plan
2123            let active = self.active_store();
2124            let optimizer = Optimizer::from_graph_store(&*active);
2125            let plan = optimizer.optimize(logical_plan)?;
2126
2127            // Cache the optimized plan for future use
2128            self.query_cache.put_optimized(cache_key, plan.clone());
2129
2130            plan
2131        };
2132
2133        // Resolve the active store for query execution
2134        let active = self.active_store();
2135
2136        // EXPLAIN: annotate pushdown hints and return the plan tree
2137        if optimized_plan.explain {
2138            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2139            let mut plan = optimized_plan;
2140            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2141            return Ok(explain_result(&plan));
2142        }
2143
2144        // PROFILE: execute with per-operator instrumentation
2145        if optimized_plan.profile {
2146            let has_mutations = optimized_plan.root.has_mutations();
2147            return self.with_auto_commit(has_mutations, || {
2148                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2149                let planner = self.create_planner_for_store(
2150                    Arc::clone(&active),
2151                    viewing_epoch,
2152                    transaction_id,
2153                );
2154                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2155
2156                let executor = Executor::with_columns(physical_plan.columns.clone())
2157                    .with_deadline(self.query_deadline());
2158                let _result = executor.execute(physical_plan.operator.as_mut())?;
2159
2160                let total_time_ms;
2161                #[cfg(not(target_arch = "wasm32"))]
2162                {
2163                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2164                }
2165                #[cfg(target_arch = "wasm32")]
2166                {
2167                    total_time_ms = 0.0;
2168                }
2169
2170                let profile_tree = crate::query::profile::build_profile_tree(
2171                    &optimized_plan.root,
2172                    &mut entries.into_iter(),
2173                );
2174                Ok(crate::query::profile::profile_result(
2175                    &profile_tree,
2176                    total_time_ms,
2177                ))
2178            });
2179        }
2180
2181        let has_mutations = optimized_plan.root.has_mutations();
2182
2183        self.with_auto_commit(has_mutations, || {
2184            // Get transaction context for MVCC visibility
2185            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2186
2187            // Convert to physical plan with transaction context
2188            // (Physical planning cannot be cached as it depends on transaction state)
2189            let planner =
2190                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2191            let mut physical_plan = planner.plan(&optimized_plan)?;
2192
2193            // Execute the plan
2194            let executor = Executor::with_columns(physical_plan.columns.clone())
2195                .with_deadline(self.query_deadline());
2196            let mut result = executor.execute(physical_plan.operator.as_mut())?;
2197
2198            // Add execution metrics
2199            let rows_scanned = result.rows.len() as u64;
2200            #[cfg(not(target_arch = "wasm32"))]
2201            {
2202                let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2203                result.execution_time_ms = Some(elapsed_ms);
2204            }
2205            result.rows_scanned = Some(rows_scanned);
2206
2207            Ok(result)
2208        })
2209    }
2210
2211    /// Executes a GQL query with visibility at the specified epoch.
2212    ///
2213    /// This enables time-travel queries: the query sees the database
2214    /// as it existed at the given epoch.
2215    ///
2216    /// # Errors
2217    ///
2218    /// Returns an error if parsing or execution fails.
2219    #[cfg(feature = "gql")]
2220    pub fn execute_at_epoch(&self, query: &str, epoch: EpochId) -> Result<QueryResult> {
2221        let previous = self.viewing_epoch_override.lock().replace(epoch);
2222        let result = self.execute(query);
2223        *self.viewing_epoch_override.lock() = previous;
2224        result
2225    }
2226
2227    /// Executes a GQL query with parameters.
2228    ///
2229    /// # Errors
2230    ///
2231    /// Returns an error if the query fails to parse or execute.
2232    #[cfg(feature = "gql")]
2233    pub fn execute_with_params(
2234        &self,
2235        query: &str,
2236        params: std::collections::HashMap<String, Value>,
2237    ) -> Result<QueryResult> {
2238        self.require_lpg("GQL")?;
2239
2240        use crate::query::processor::{QueryLanguage, QueryProcessor};
2241
2242        let has_mutations = Self::query_looks_like_mutation(query);
2243        let active = self.active_store();
2244
2245        self.with_auto_commit(has_mutations, || {
2246            // Get transaction context for MVCC visibility
2247            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2248
2249            // Create processor with transaction context
2250            let processor = QueryProcessor::for_graph_store_with_transaction(
2251                Arc::clone(&active),
2252                Arc::clone(&self.transaction_manager),
2253            )?;
2254
2255            // Apply transaction context if in a transaction
2256            let processor = if let Some(transaction_id) = transaction_id {
2257                processor.with_transaction_context(viewing_epoch, transaction_id)
2258            } else {
2259                processor
2260            };
2261
2262            processor.process(query, QueryLanguage::Gql, Some(&params))
2263        })
2264    }
2265
2266    /// Executes a GQL query with parameters.
2267    ///
2268    /// # Errors
2269    ///
2270    /// Returns an error if no query language is enabled.
2271    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2272    pub fn execute_with_params(
2273        &self,
2274        _query: &str,
2275        _params: std::collections::HashMap<String, Value>,
2276    ) -> Result<QueryResult> {
2277        Err(grafeo_common::utils::error::Error::Internal(
2278            "No query language enabled".to_string(),
2279        ))
2280    }
2281
2282    /// Executes a GQL query.
2283    ///
2284    /// # Errors
2285    ///
2286    /// Returns an error if no query language is enabled.
2287    #[cfg(not(any(feature = "gql", feature = "cypher")))]
2288    pub fn execute(&self, _query: &str) -> Result<QueryResult> {
2289        Err(grafeo_common::utils::error::Error::Internal(
2290            "No query language enabled".to_string(),
2291        ))
2292    }
2293
2294    /// Executes a Cypher query.
2295    ///
2296    /// # Errors
2297    ///
2298    /// Returns an error if the query fails to parse or execute.
2299    #[cfg(feature = "cypher")]
2300    pub fn execute_cypher(&self, query: &str) -> Result<QueryResult> {
2301        use crate::query::{
2302            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer,
2303            processor::QueryLanguage, translators::cypher,
2304        };
2305        use grafeo_common::utils::error::{Error as GrafeoError, QueryError, QueryErrorKind};
2306
2307        // Handle schema DDL and SHOW commands before the normal query path
2308        let translation = cypher::translate_full(query)?;
2309        match translation {
2310            cypher::CypherTranslationResult::SchemaCommand(cmd) => {
2311                if *self.read_only_tx.lock() {
2312                    return Err(GrafeoError::Query(QueryError::new(
2313                        QueryErrorKind::Semantic,
2314                        "Cannot execute schema DDL in a read-only transaction",
2315                    )));
2316                }
2317                return self.execute_schema_command(cmd);
2318            }
2319            cypher::CypherTranslationResult::ShowIndexes => {
2320                return self.execute_show_indexes();
2321            }
2322            cypher::CypherTranslationResult::ShowConstraints => {
2323                return self.execute_show_constraints();
2324            }
2325            cypher::CypherTranslationResult::ShowCurrentGraphType => {
2326                return self.execute_show_current_graph_type();
2327            }
2328            cypher::CypherTranslationResult::Plan(_) => {
2329                // Fall through to normal execution below
2330            }
2331        }
2332
2333        #[cfg(not(target_arch = "wasm32"))]
2334        let start_time = std::time::Instant::now();
2335
2336        // Create cache key for this query
2337        let cache_key = CacheKey::with_graph(query, QueryLanguage::Cypher, self.current_graph());
2338
2339        // Try to get cached optimized plan
2340        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2341            cached_plan
2342        } else {
2343            // Parse and translate the query to a logical plan
2344            let logical_plan = cypher::translate(query)?;
2345
2346            // Semantic validation
2347            let mut binder = Binder::new();
2348            let _binding_context = binder.bind(&logical_plan)?;
2349
2350            // Optimize the plan
2351            let active = self.active_store();
2352            let optimizer = Optimizer::from_graph_store(&*active);
2353            let plan = optimizer.optimize(logical_plan)?;
2354
2355            // Cache the optimized plan
2356            self.query_cache.put_optimized(cache_key, plan.clone());
2357
2358            plan
2359        };
2360
2361        // Resolve the active store for query execution
2362        let active = self.active_store();
2363
2364        // EXPLAIN
2365        if optimized_plan.explain {
2366            use crate::query::processor::{annotate_pushdown_hints, explain_result};
2367            let mut plan = optimized_plan;
2368            annotate_pushdown_hints(&mut plan.root, active.as_ref());
2369            return Ok(explain_result(&plan));
2370        }
2371
2372        // PROFILE
2373        if optimized_plan.profile {
2374            let has_mutations = optimized_plan.root.has_mutations();
2375            return self.with_auto_commit(has_mutations, || {
2376                let (viewing_epoch, transaction_id) = self.get_transaction_context();
2377                let planner = self.create_planner_for_store(
2378                    Arc::clone(&active),
2379                    viewing_epoch,
2380                    transaction_id,
2381                );
2382                let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
2383
2384                let executor = Executor::with_columns(physical_plan.columns.clone())
2385                    .with_deadline(self.query_deadline());
2386                let _result = executor.execute(physical_plan.operator.as_mut())?;
2387
2388                let total_time_ms;
2389                #[cfg(not(target_arch = "wasm32"))]
2390                {
2391                    total_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
2392                }
2393                #[cfg(target_arch = "wasm32")]
2394                {
2395                    total_time_ms = 0.0;
2396                }
2397
2398                let profile_tree = crate::query::profile::build_profile_tree(
2399                    &optimized_plan.root,
2400                    &mut entries.into_iter(),
2401                );
2402                Ok(crate::query::profile::profile_result(
2403                    &profile_tree,
2404                    total_time_ms,
2405                ))
2406            });
2407        }
2408
2409        let has_mutations = optimized_plan.root.has_mutations();
2410
2411        self.with_auto_commit(has_mutations, || {
2412            // Get transaction context for MVCC visibility
2413            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2414
2415            // Convert to physical plan with transaction context
2416            let planner =
2417                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2418            let mut physical_plan = planner.plan(&optimized_plan)?;
2419
2420            // Execute the plan
2421            let executor = Executor::with_columns(physical_plan.columns.clone())
2422                .with_deadline(self.query_deadline());
2423            executor.execute(physical_plan.operator.as_mut())
2424        })
2425    }
2426
2427    /// Executes a Gremlin query.
2428    ///
2429    /// # Errors
2430    ///
2431    /// Returns an error if the query fails to parse or execute.
2432    ///
2433    /// # Examples
2434    ///
2435    /// ```no_run
2436    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2437    /// use grafeo_engine::GrafeoDB;
2438    ///
2439    /// let db = GrafeoDB::new_in_memory();
2440    /// let session = db.session();
2441    ///
2442    /// // Create some nodes first
2443    /// session.create_node(&["Person"]);
2444    ///
2445    /// // Query using Gremlin
2446    /// let result = session.execute_gremlin("g.V().hasLabel('Person')")?;
2447    /// # Ok(())
2448    /// # }
2449    /// ```
2450    #[cfg(feature = "gremlin")]
2451    pub fn execute_gremlin(&self, query: &str) -> Result<QueryResult> {
2452        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::gremlin};
2453
2454        // Parse and translate the query to a logical plan
2455        let logical_plan = gremlin::translate(query)?;
2456
2457        // Semantic validation
2458        let mut binder = Binder::new();
2459        let _binding_context = binder.bind(&logical_plan)?;
2460
2461        // Optimize the plan
2462        let active = self.active_store();
2463        let optimizer = Optimizer::from_graph_store(&*active);
2464        let optimized_plan = optimizer.optimize(logical_plan)?;
2465
2466        let has_mutations = optimized_plan.root.has_mutations();
2467
2468        self.with_auto_commit(has_mutations, || {
2469            // Get transaction context for MVCC visibility
2470            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2471
2472            // Convert to physical plan with transaction context
2473            let planner =
2474                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2475            let mut physical_plan = planner.plan(&optimized_plan)?;
2476
2477            // Execute the plan
2478            let executor = Executor::with_columns(physical_plan.columns.clone())
2479                .with_deadline(self.query_deadline());
2480            executor.execute(physical_plan.operator.as_mut())
2481        })
2482    }
2483
2484    /// Executes a Gremlin query with parameters.
2485    ///
2486    /// # Errors
2487    ///
2488    /// Returns an error if the query fails to parse or execute.
2489    #[cfg(feature = "gremlin")]
2490    pub fn execute_gremlin_with_params(
2491        &self,
2492        query: &str,
2493        params: std::collections::HashMap<String, Value>,
2494    ) -> Result<QueryResult> {
2495        use crate::query::processor::{QueryLanguage, QueryProcessor};
2496
2497        let has_mutations = Self::query_looks_like_mutation(query);
2498        let active = self.active_store();
2499
2500        self.with_auto_commit(has_mutations, || {
2501            // Get transaction context for MVCC visibility
2502            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2503
2504            // Create processor with transaction context
2505            let processor = QueryProcessor::for_graph_store_with_transaction(
2506                Arc::clone(&active),
2507                Arc::clone(&self.transaction_manager),
2508            )?;
2509
2510            // Apply transaction context if in a transaction
2511            let processor = if let Some(transaction_id) = transaction_id {
2512                processor.with_transaction_context(viewing_epoch, transaction_id)
2513            } else {
2514                processor
2515            };
2516
2517            processor.process(query, QueryLanguage::Gremlin, Some(&params))
2518        })
2519    }
2520
2521    /// Executes a GraphQL query against the LPG store.
2522    ///
2523    /// # Errors
2524    ///
2525    /// Returns an error if the query fails to parse or execute.
2526    ///
2527    /// # Examples
2528    ///
2529    /// ```no_run
2530    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2531    /// use grafeo_engine::GrafeoDB;
2532    ///
2533    /// let db = GrafeoDB::new_in_memory();
2534    /// let session = db.session();
2535    ///
2536    /// // Create some nodes first
2537    /// session.create_node(&["User"]);
2538    ///
2539    /// // Query using GraphQL
2540    /// let result = session.execute_graphql("query { user { id name } }")?;
2541    /// # Ok(())
2542    /// # }
2543    /// ```
2544    #[cfg(feature = "graphql")]
2545    pub fn execute_graphql(&self, query: &str) -> Result<QueryResult> {
2546        use crate::query::{Executor, binder::Binder, optimizer::Optimizer, translators::graphql};
2547
2548        // Parse and translate the query to a logical plan
2549        let logical_plan = graphql::translate(query)?;
2550
2551        // Semantic validation
2552        let mut binder = Binder::new();
2553        let _binding_context = binder.bind(&logical_plan)?;
2554
2555        // Optimize the plan
2556        let active = self.active_store();
2557        let optimizer = Optimizer::from_graph_store(&*active);
2558        let optimized_plan = optimizer.optimize(logical_plan)?;
2559
2560        let has_mutations = optimized_plan.root.has_mutations();
2561
2562        self.with_auto_commit(has_mutations, || {
2563            // Get transaction context for MVCC visibility
2564            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2565
2566            // Convert to physical plan with transaction context
2567            let planner =
2568                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2569            let mut physical_plan = planner.plan(&optimized_plan)?;
2570
2571            // Execute the plan
2572            let executor = Executor::with_columns(physical_plan.columns.clone())
2573                .with_deadline(self.query_deadline());
2574            executor.execute(physical_plan.operator.as_mut())
2575        })
2576    }
2577
2578    /// Executes a GraphQL query with parameters.
2579    ///
2580    /// # Errors
2581    ///
2582    /// Returns an error if the query fails to parse or execute.
2583    #[cfg(feature = "graphql")]
2584    pub fn execute_graphql_with_params(
2585        &self,
2586        query: &str,
2587        params: std::collections::HashMap<String, Value>,
2588    ) -> Result<QueryResult> {
2589        use crate::query::processor::{QueryLanguage, QueryProcessor};
2590
2591        let has_mutations = Self::query_looks_like_mutation(query);
2592        let active = self.active_store();
2593
2594        self.with_auto_commit(has_mutations, || {
2595            // Get transaction context for MVCC visibility
2596            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2597
2598            // Create processor with transaction context
2599            let processor = QueryProcessor::for_graph_store_with_transaction(
2600                Arc::clone(&active),
2601                Arc::clone(&self.transaction_manager),
2602            )?;
2603
2604            // Apply transaction context if in a transaction
2605            let processor = if let Some(transaction_id) = transaction_id {
2606                processor.with_transaction_context(viewing_epoch, transaction_id)
2607            } else {
2608                processor
2609            };
2610
2611            processor.process(query, QueryLanguage::GraphQL, Some(&params))
2612        })
2613    }
2614
2615    /// Executes a GraphQL query against the RDF store.
2616    ///
2617    /// # Errors
2618    ///
2619    /// Returns an error if the query fails to parse or execute.
2620    #[cfg(all(feature = "graphql", feature = "rdf"))]
2621    pub fn execute_graphql_rdf(&self, query: &str) -> Result<QueryResult> {
2622        use crate::query::{
2623            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::graphql_rdf,
2624        };
2625
2626        let logical_plan = graphql_rdf::translate(query, "http://example.org/")?;
2627
2628        let active = self.active_store();
2629        let optimizer = Optimizer::from_graph_store(&*active);
2630        let optimized_plan = optimizer.optimize(logical_plan)?;
2631
2632        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2633            .with_transaction_id(*self.current_transaction.lock());
2634        #[cfg(feature = "wal")]
2635        let planner = planner.with_wal(self.wal.clone());
2636        let mut physical_plan = planner.plan(&optimized_plan)?;
2637
2638        let executor = Executor::with_columns(physical_plan.columns.clone())
2639            .with_deadline(self.query_deadline());
2640        executor.execute(physical_plan.operator.as_mut())
2641    }
2642
2643    /// Executes a GraphQL query against the RDF store with parameters.
2644    ///
2645    /// # Errors
2646    ///
2647    /// Returns an error if the query fails to parse or execute.
2648    #[cfg(all(feature = "graphql", feature = "rdf"))]
2649    pub fn execute_graphql_rdf_with_params(
2650        &self,
2651        query: &str,
2652        params: std::collections::HashMap<String, Value>,
2653    ) -> Result<QueryResult> {
2654        use crate::query::processor::{QueryLanguage, QueryProcessor};
2655
2656        let has_mutations = Self::query_looks_like_mutation(query);
2657        let active = self.active_store();
2658
2659        self.with_auto_commit(has_mutations, || {
2660            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2661
2662            let processor = QueryProcessor::for_graph_store_with_transaction(
2663                Arc::clone(&active),
2664                Arc::clone(&self.transaction_manager),
2665            )?;
2666
2667            let processor = if let Some(transaction_id) = transaction_id {
2668                processor.with_transaction_context(viewing_epoch, transaction_id)
2669            } else {
2670                processor
2671            };
2672
2673            processor.process(query, QueryLanguage::GraphQLRdf, Some(&params))
2674        })
2675    }
2676
2677    /// Executes a SQL/PGQ query (SQL:2023 GRAPH_TABLE).
2678    ///
2679    /// # Errors
2680    ///
2681    /// Returns an error if the query fails to parse or execute.
2682    ///
2683    /// # Examples
2684    ///
2685    /// ```no_run
2686    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2687    /// use grafeo_engine::GrafeoDB;
2688    ///
2689    /// let db = GrafeoDB::new_in_memory();
2690    /// let session = db.session();
2691    ///
2692    /// let result = session.execute_sql(
2693    ///     "SELECT * FROM GRAPH_TABLE (
2694    ///         MATCH (n:Person)
2695    ///         COLUMNS (n.name AS name)
2696    ///     )"
2697    /// )?;
2698    /// # Ok(())
2699    /// # }
2700    /// ```
2701    #[cfg(feature = "sql-pgq")]
2702    pub fn execute_sql(&self, query: &str) -> Result<QueryResult> {
2703        use crate::query::{
2704            Executor, binder::Binder, cache::CacheKey, optimizer::Optimizer, plan::LogicalOperator,
2705            processor::QueryLanguage, translators::sql_pgq,
2706        };
2707
2708        // Parse and translate (always needed to check for DDL)
2709        let logical_plan = sql_pgq::translate(query)?;
2710
2711        // Handle DDL statements directly (they don't go through the query pipeline)
2712        if let LogicalOperator::CreatePropertyGraph(ref cpg) = logical_plan.root {
2713            return Ok(QueryResult {
2714                columns: vec!["status".into()],
2715                column_types: vec![grafeo_common::types::LogicalType::String],
2716                rows: vec![vec![Value::from(format!(
2717                    "Property graph '{}' created ({} node tables, {} edge tables)",
2718                    cpg.name,
2719                    cpg.node_tables.len(),
2720                    cpg.edge_tables.len()
2721                ))]],
2722                execution_time_ms: None,
2723                rows_scanned: None,
2724                status_message: None,
2725                gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
2726            });
2727        }
2728
2729        // Create cache key for query plans
2730        let cache_key = CacheKey::with_graph(query, QueryLanguage::SqlPgq, self.current_graph());
2731
2732        // Try to get cached optimized plan
2733        let optimized_plan = if let Some(cached_plan) = self.query_cache.get_optimized(&cache_key) {
2734            cached_plan
2735        } else {
2736            // Semantic validation
2737            let mut binder = Binder::new();
2738            let _binding_context = binder.bind(&logical_plan)?;
2739
2740            // Optimize the plan
2741            let active = self.active_store();
2742            let optimizer = Optimizer::from_graph_store(&*active);
2743            let plan = optimizer.optimize(logical_plan)?;
2744
2745            // Cache the optimized plan
2746            self.query_cache.put_optimized(cache_key, plan.clone());
2747
2748            plan
2749        };
2750
2751        let active = self.active_store();
2752        let has_mutations = optimized_plan.root.has_mutations();
2753
2754        self.with_auto_commit(has_mutations, || {
2755            // Get transaction context for MVCC visibility
2756            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2757
2758            // Convert to physical plan with transaction context
2759            let planner =
2760                self.create_planner_for_store(Arc::clone(&active), viewing_epoch, transaction_id);
2761            let mut physical_plan = planner.plan(&optimized_plan)?;
2762
2763            // Execute the plan
2764            let executor = Executor::with_columns(physical_plan.columns.clone())
2765                .with_deadline(self.query_deadline());
2766            executor.execute(physical_plan.operator.as_mut())
2767        })
2768    }
2769
2770    /// Executes a SQL/PGQ query with parameters.
2771    ///
2772    /// # Errors
2773    ///
2774    /// Returns an error if the query fails to parse or execute.
2775    #[cfg(feature = "sql-pgq")]
2776    pub fn execute_sql_with_params(
2777        &self,
2778        query: &str,
2779        params: std::collections::HashMap<String, Value>,
2780    ) -> Result<QueryResult> {
2781        use crate::query::processor::{QueryLanguage, QueryProcessor};
2782
2783        let has_mutations = Self::query_looks_like_mutation(query);
2784        let active = self.active_store();
2785
2786        self.with_auto_commit(has_mutations, || {
2787            // Get transaction context for MVCC visibility
2788            let (viewing_epoch, transaction_id) = self.get_transaction_context();
2789
2790            // Create processor with transaction context
2791            let processor = QueryProcessor::for_graph_store_with_transaction(
2792                Arc::clone(&active),
2793                Arc::clone(&self.transaction_manager),
2794            )?;
2795
2796            // Apply transaction context if in a transaction
2797            let processor = if let Some(transaction_id) = transaction_id {
2798                processor.with_transaction_context(viewing_epoch, transaction_id)
2799            } else {
2800                processor
2801            };
2802
2803            processor.process(query, QueryLanguage::SqlPgq, Some(&params))
2804        })
2805    }
2806
2807    /// Executes a SPARQL query.
2808    ///
2809    /// # Errors
2810    ///
2811    /// Returns an error if the query fails to parse or execute.
2812    #[cfg(all(feature = "sparql", feature = "rdf"))]
2813    pub fn execute_sparql(&self, query: &str) -> Result<QueryResult> {
2814        use crate::query::{
2815            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, translators::sparql,
2816        };
2817
2818        // Parse and translate the SPARQL query to a logical plan
2819        let logical_plan = sparql::translate(query)?;
2820
2821        // Optimize the plan
2822        let active = self.active_store();
2823        let optimizer = Optimizer::from_graph_store(&*active);
2824        let optimized_plan = optimizer.optimize(logical_plan)?;
2825
2826        // Convert to physical plan using RDF planner
2827        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2828            .with_transaction_id(*self.current_transaction.lock());
2829        #[cfg(feature = "wal")]
2830        let planner = planner.with_wal(self.wal.clone());
2831        let mut physical_plan = planner.plan(&optimized_plan)?;
2832
2833        // Execute the plan
2834        let executor = Executor::with_columns(physical_plan.columns.clone())
2835            .with_deadline(self.query_deadline());
2836        executor.execute(physical_plan.operator.as_mut())
2837    }
2838
2839    /// Executes a SPARQL query with parameters.
2840    ///
2841    /// # Errors
2842    ///
2843    /// Returns an error if the query fails to parse or execute.
2844    #[cfg(all(feature = "sparql", feature = "rdf"))]
2845    pub fn execute_sparql_with_params(
2846        &self,
2847        query: &str,
2848        params: std::collections::HashMap<String, Value>,
2849    ) -> Result<QueryResult> {
2850        use crate::query::{
2851            Executor, optimizer::Optimizer, planner::rdf::RdfPlanner, processor::substitute_params,
2852            translators::sparql,
2853        };
2854
2855        let mut logical_plan = sparql::translate(query)?;
2856
2857        substitute_params(&mut logical_plan, &params)?;
2858
2859        let active = self.active_store();
2860        let optimizer = Optimizer::from_graph_store(&*active);
2861        let optimized_plan = optimizer.optimize(logical_plan)?;
2862
2863        let planner = RdfPlanner::new(Arc::clone(&self.rdf_store))
2864            .with_transaction_id(*self.current_transaction.lock());
2865        #[cfg(feature = "wal")]
2866        let planner = planner.with_wal(self.wal.clone());
2867        let mut physical_plan = planner.plan(&optimized_plan)?;
2868
2869        let executor = Executor::with_columns(physical_plan.columns.clone())
2870            .with_deadline(self.query_deadline());
2871        executor.execute(physical_plan.operator.as_mut())
2872    }
2873
2874    /// Executes a query in the specified language by name.
2875    ///
2876    /// Supported language names: `"gql"`, `"cypher"`, `"gremlin"`, `"graphql"`,
2877    /// `"graphql-rdf"`, `"sparql"`, `"sql"`. Each requires the corresponding feature flag.
2878    ///
2879    /// # Errors
2880    ///
2881    /// Returns an error if the language is unknown/disabled or the query fails.
2882    pub fn execute_language(
2883        &self,
2884        query: &str,
2885        language: &str,
2886        params: Option<std::collections::HashMap<String, Value>>,
2887    ) -> Result<QueryResult> {
2888        match language {
2889            "gql" => {
2890                if let Some(p) = params {
2891                    self.execute_with_params(query, p)
2892                } else {
2893                    self.execute(query)
2894                }
2895            }
2896            #[cfg(feature = "cypher")]
2897            "cypher" => {
2898                if let Some(p) = params {
2899                    use crate::query::processor::{QueryLanguage, QueryProcessor};
2900                    let has_mutations = Self::query_looks_like_mutation(query);
2901                    let active = self.active_store();
2902                    self.with_auto_commit(has_mutations, || {
2903                        let processor = QueryProcessor::for_graph_store_with_transaction(
2904                            Arc::clone(&active),
2905                            Arc::clone(&self.transaction_manager),
2906                        )?;
2907                        let (viewing_epoch, transaction_id) = self.get_transaction_context();
2908                        let processor = if let Some(transaction_id) = transaction_id {
2909                            processor.with_transaction_context(viewing_epoch, transaction_id)
2910                        } else {
2911                            processor
2912                        };
2913                        processor.process(query, QueryLanguage::Cypher, Some(&p))
2914                    })
2915                } else {
2916                    self.execute_cypher(query)
2917                }
2918            }
2919            #[cfg(feature = "gremlin")]
2920            "gremlin" => {
2921                if let Some(p) = params {
2922                    self.execute_gremlin_with_params(query, p)
2923                } else {
2924                    self.execute_gremlin(query)
2925                }
2926            }
2927            #[cfg(feature = "graphql")]
2928            "graphql" => {
2929                if let Some(p) = params {
2930                    self.execute_graphql_with_params(query, p)
2931                } else {
2932                    self.execute_graphql(query)
2933                }
2934            }
2935            #[cfg(all(feature = "graphql", feature = "rdf"))]
2936            "graphql-rdf" => {
2937                if let Some(p) = params {
2938                    self.execute_graphql_rdf_with_params(query, p)
2939                } else {
2940                    self.execute_graphql_rdf(query)
2941                }
2942            }
2943            #[cfg(feature = "sql-pgq")]
2944            "sql" | "sql-pgq" => {
2945                if let Some(p) = params {
2946                    self.execute_sql_with_params(query, p)
2947                } else {
2948                    self.execute_sql(query)
2949                }
2950            }
2951            #[cfg(all(feature = "sparql", feature = "rdf"))]
2952            "sparql" => {
2953                if let Some(p) = params {
2954                    self.execute_sparql_with_params(query, p)
2955                } else {
2956                    self.execute_sparql(query)
2957                }
2958            }
2959            other => Err(grafeo_common::utils::error::Error::Query(
2960                grafeo_common::utils::error::QueryError::new(
2961                    grafeo_common::utils::error::QueryErrorKind::Semantic,
2962                    format!("Unknown query language: '{other}'"),
2963                ),
2964            )),
2965        }
2966    }
2967
2968    /// Begins a new transaction.
2969    ///
2970    /// # Errors
2971    ///
2972    /// Returns an error if a transaction is already active.
2973    ///
2974    /// # Examples
2975    ///
2976    /// ```no_run
2977    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
2978    /// use grafeo_engine::GrafeoDB;
2979    ///
2980    /// let db = GrafeoDB::new_in_memory();
2981    /// let mut session = db.session();
2982    ///
2983    /// session.begin_transaction()?;
2984    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
2985    /// session.execute("INSERT (:Person {name: 'Gus'})")?;
2986    /// session.commit()?; // Both inserts committed atomically
2987    /// # Ok(())
2988    /// # }
2989    /// ```
2990    /// Clears all cached query plans.
2991    ///
2992    /// The plan cache is shared across all sessions on the same database,
2993    /// so clearing from one session affects all sessions.
2994    pub fn clear_plan_cache(&self) {
2995        self.query_cache.clear();
2996    }
2997
2998    /// Begins a new transaction on this session.
2999    ///
3000    /// Uses the default isolation level (`SnapshotIsolation`).
3001    ///
3002    /// # Errors
3003    ///
3004    /// Returns an error if a transaction is already active.
3005    pub fn begin_transaction(&mut self) -> Result<()> {
3006        self.begin_transaction_inner(false, None)
3007    }
3008
3009    /// Begins a transaction with a specific isolation level.
3010    ///
3011    /// See [`begin_transaction`](Self::begin_transaction) for the default (`SnapshotIsolation`).
3012    ///
3013    /// # Errors
3014    ///
3015    /// Returns an error if a transaction is already active.
3016    pub fn begin_transaction_with_isolation(
3017        &mut self,
3018        isolation_level: crate::transaction::IsolationLevel,
3019    ) -> Result<()> {
3020        self.begin_transaction_inner(false, Some(isolation_level))
3021    }
3022
3023    /// Core transaction begin logic, usable from both `&mut self` and `&self` paths.
3024    fn begin_transaction_inner(
3025        &self,
3026        read_only: bool,
3027        isolation_level: Option<crate::transaction::IsolationLevel>,
3028    ) -> Result<()> {
3029        let mut current = self.current_transaction.lock();
3030        if current.is_some() {
3031            // Nested transaction: create an auto-savepoint instead of a new tx.
3032            drop(current);
3033            let mut depth = self.transaction_nesting_depth.lock();
3034            *depth += 1;
3035            let sp_name = format!("_nested_tx_{}", *depth);
3036            self.savepoint(&sp_name)?;
3037            return Ok(());
3038        }
3039
3040        let active = self.active_lpg_store();
3041        self.transaction_start_node_count
3042            .store(active.node_count(), Ordering::Relaxed);
3043        self.transaction_start_edge_count
3044            .store(active.edge_count(), Ordering::Relaxed);
3045        let transaction_id = if let Some(level) = isolation_level {
3046            self.transaction_manager.begin_with_isolation(level)
3047        } else {
3048            self.transaction_manager.begin()
3049        };
3050        *current = Some(transaction_id);
3051        *self.read_only_tx.lock() = read_only;
3052
3053        // Record the initial graph as "touched" for cross-graph atomicity.
3054        // Uses the full storage key (schema/graph) for schema-scoped resolution.
3055        let key = self.active_graph_storage_key();
3056        let mut touched = self.touched_graphs.lock();
3057        touched.clear();
3058        touched.push(key);
3059
3060        Ok(())
3061    }
3062
3063    /// Commits the current transaction.
3064    ///
3065    /// Makes all changes since [`begin_transaction`](Self::begin_transaction) permanent.
3066    ///
3067    /// # Errors
3068    ///
3069    /// Returns an error if no transaction is active.
3070    pub fn commit(&mut self) -> Result<()> {
3071        self.commit_inner()
3072    }
3073
3074    /// Core commit logic, usable from both `&mut self` and `&self` paths.
3075    fn commit_inner(&self) -> Result<()> {
3076        // Nested transaction: release the auto-savepoint (changes are preserved).
3077        {
3078            let mut depth = self.transaction_nesting_depth.lock();
3079            if *depth > 0 {
3080                let sp_name = format!("_nested_tx_{depth}");
3081                *depth -= 1;
3082                drop(depth);
3083                return self.release_savepoint(&sp_name);
3084            }
3085        }
3086
3087        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3088            grafeo_common::utils::error::Error::Transaction(
3089                grafeo_common::utils::error::TransactionError::InvalidState(
3090                    "No active transaction".to_string(),
3091                ),
3092            )
3093        })?;
3094
3095        // Validate the transaction first (conflict detection) before committing data.
3096        // If this fails, we rollback the data changes instead of making them permanent.
3097        let touched = self.touched_graphs.lock().clone();
3098        let commit_epoch = match self.transaction_manager.commit(transaction_id) {
3099            Ok(epoch) => epoch,
3100            Err(e) => {
3101                // Conflict detected: rollback the data changes
3102                for graph_name in &touched {
3103                    let store = self.resolve_store(graph_name);
3104                    store.rollback_transaction_properties(transaction_id);
3105                }
3106                #[cfg(feature = "rdf")]
3107                self.rdf_store.rollback_transaction(transaction_id);
3108                *self.read_only_tx.lock() = false;
3109                self.savepoints.lock().clear();
3110                self.touched_graphs.lock().clear();
3111                return Err(e);
3112            }
3113        };
3114
3115        // Finalize PENDING epochs: make uncommitted versions visible at the commit epoch.
3116        for graph_name in &touched {
3117            let store = self.resolve_store(graph_name);
3118            store.finalize_version_epochs(transaction_id, commit_epoch);
3119        }
3120
3121        // Commit succeeded: discard undo logs (make changes permanent)
3122        #[cfg(feature = "rdf")]
3123        self.rdf_store.commit_transaction(transaction_id);
3124
3125        for graph_name in &touched {
3126            let store = self.resolve_store(graph_name);
3127            store.commit_transaction_properties(transaction_id);
3128        }
3129
3130        // Sync epoch for all touched graphs so that convenience lookups
3131        // (edge_type, get_edge, get_node) can see versions at the latest epoch.
3132        let current_epoch = self.transaction_manager.current_epoch();
3133        for graph_name in &touched {
3134            let store = self.resolve_store(graph_name);
3135            store.sync_epoch(current_epoch);
3136        }
3137
3138        // Reset read-only flag, clear savepoints and touched graphs
3139        *self.read_only_tx.lock() = false;
3140        self.savepoints.lock().clear();
3141        self.touched_graphs.lock().clear();
3142
3143        // Auto-GC: periodically prune old MVCC versions
3144        if self.gc_interval > 0 {
3145            let count = self.commit_counter.fetch_add(1, Ordering::Relaxed) + 1;
3146            if count.is_multiple_of(self.gc_interval) {
3147                let min_epoch = self.transaction_manager.min_active_epoch();
3148                for graph_name in &touched {
3149                    let store = self.resolve_store(graph_name);
3150                    store.gc_versions(min_epoch);
3151                }
3152                self.transaction_manager.gc();
3153            }
3154        }
3155
3156        Ok(())
3157    }
3158
3159    /// Aborts the current transaction.
3160    ///
3161    /// Discards all changes since [`begin_transaction`](Self::begin_transaction).
3162    ///
3163    /// # Errors
3164    ///
3165    /// Returns an error if no transaction is active.
3166    ///
3167    /// # Examples
3168    ///
3169    /// ```no_run
3170    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3171    /// use grafeo_engine::GrafeoDB;
3172    ///
3173    /// let db = GrafeoDB::new_in_memory();
3174    /// let mut session = db.session();
3175    ///
3176    /// session.begin_transaction()?;
3177    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3178    /// session.rollback()?; // Insert is discarded
3179    /// # Ok(())
3180    /// # }
3181    /// ```
3182    pub fn rollback(&mut self) -> Result<()> {
3183        self.rollback_inner()
3184    }
3185
3186    /// Core rollback logic, usable from both `&mut self` and `&self` paths.
3187    fn rollback_inner(&self) -> Result<()> {
3188        // Nested transaction: rollback to the auto-savepoint.
3189        {
3190            let mut depth = self.transaction_nesting_depth.lock();
3191            if *depth > 0 {
3192                let sp_name = format!("_nested_tx_{depth}");
3193                *depth -= 1;
3194                drop(depth);
3195                return self.rollback_to_savepoint(&sp_name);
3196            }
3197        }
3198
3199        let transaction_id = self.current_transaction.lock().take().ok_or_else(|| {
3200            grafeo_common::utils::error::Error::Transaction(
3201                grafeo_common::utils::error::TransactionError::InvalidState(
3202                    "No active transaction".to_string(),
3203                ),
3204            )
3205        })?;
3206
3207        // Reset read-only flag
3208        *self.read_only_tx.lock() = false;
3209
3210        // Discard uncommitted versions in ALL touched LPG stores (cross-graph atomicity).
3211        let touched = self.touched_graphs.lock().clone();
3212        for graph_name in &touched {
3213            let store = self.resolve_store(graph_name);
3214            store.discard_uncommitted_versions(transaction_id);
3215        }
3216
3217        // Discard pending operations in the RDF store
3218        #[cfg(feature = "rdf")]
3219        self.rdf_store.rollback_transaction(transaction_id);
3220
3221        // Clear savepoints and touched graphs
3222        self.savepoints.lock().clear();
3223        self.touched_graphs.lock().clear();
3224
3225        // Mark transaction as aborted in the manager
3226        self.transaction_manager.abort(transaction_id)
3227    }
3228
3229    /// Creates a named savepoint within the current transaction.
3230    ///
3231    /// The savepoint captures the current node/edge ID counters so that
3232    /// [`rollback_to_savepoint`](Self::rollback_to_savepoint) can discard
3233    /// entities created after this point.
3234    ///
3235    /// # Errors
3236    ///
3237    /// Returns an error if no transaction is active.
3238    pub fn savepoint(&self, name: &str) -> Result<()> {
3239        let tx_id = self.current_transaction.lock().ok_or_else(|| {
3240            grafeo_common::utils::error::Error::Transaction(
3241                grafeo_common::utils::error::TransactionError::InvalidState(
3242                    "No active transaction".to_string(),
3243                ),
3244            )
3245        })?;
3246
3247        // Capture state for every graph touched so far.
3248        let touched = self.touched_graphs.lock().clone();
3249        let graph_snapshots: Vec<GraphSavepoint> = touched
3250            .iter()
3251            .map(|graph_name| {
3252                let store = self.resolve_store(graph_name);
3253                GraphSavepoint {
3254                    graph_name: graph_name.clone(),
3255                    next_node_id: store.peek_next_node_id(),
3256                    next_edge_id: store.peek_next_edge_id(),
3257                    undo_log_position: store.property_undo_log_position(tx_id),
3258                }
3259            })
3260            .collect();
3261
3262        self.savepoints.lock().push(SavepointState {
3263            name: name.to_string(),
3264            graph_snapshots,
3265            active_graph: self.current_graph.lock().clone(),
3266        });
3267        Ok(())
3268    }
3269
3270    /// Rolls back to a named savepoint, undoing all writes made after it.
3271    ///
3272    /// The savepoint and any savepoints created after it are removed.
3273    /// Entities with IDs >= the savepoint snapshot are discarded.
3274    ///
3275    /// # Errors
3276    ///
3277    /// Returns an error if no transaction is active or the savepoint does not exist.
3278    pub fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3279        let transaction_id = self.current_transaction.lock().ok_or_else(|| {
3280            grafeo_common::utils::error::Error::Transaction(
3281                grafeo_common::utils::error::TransactionError::InvalidState(
3282                    "No active transaction".to_string(),
3283                ),
3284            )
3285        })?;
3286
3287        let mut savepoints = self.savepoints.lock();
3288
3289        // Find the savepoint by name (search from the end for nested savepoints)
3290        let pos = savepoints
3291            .iter()
3292            .rposition(|sp| sp.name == name)
3293            .ok_or_else(|| {
3294                grafeo_common::utils::error::Error::Transaction(
3295                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3296                        "Savepoint '{name}' not found"
3297                    )),
3298                )
3299            })?;
3300
3301        let sp_state = savepoints[pos].clone();
3302
3303        // Remove this savepoint and all later ones
3304        savepoints.truncate(pos);
3305        drop(savepoints);
3306
3307        // Roll back each graph that was captured in the savepoint.
3308        for gs in &sp_state.graph_snapshots {
3309            let store = self.resolve_store(&gs.graph_name);
3310
3311            // Replay property/label undo entries recorded after the savepoint
3312            store.rollback_transaction_properties_to(transaction_id, gs.undo_log_position);
3313
3314            // Discard entities created after the savepoint
3315            let current_next_node = store.peek_next_node_id();
3316            let current_next_edge = store.peek_next_edge_id();
3317
3318            let node_ids: Vec<NodeId> = (gs.next_node_id..current_next_node)
3319                .map(NodeId::new)
3320                .collect();
3321            let edge_ids: Vec<EdgeId> = (gs.next_edge_id..current_next_edge)
3322                .map(EdgeId::new)
3323                .collect();
3324
3325            if !node_ids.is_empty() || !edge_ids.is_empty() {
3326                store.discard_entities_by_id(transaction_id, &node_ids, &edge_ids);
3327            }
3328        }
3329
3330        // Also roll back any graphs that were touched AFTER the savepoint
3331        // but not captured in it. These need full discard since the savepoint
3332        // didn't include them.
3333        let touched = self.touched_graphs.lock().clone();
3334        for graph_name in &touched {
3335            let already_captured = sp_state
3336                .graph_snapshots
3337                .iter()
3338                .any(|gs| gs.graph_name == *graph_name);
3339            if !already_captured {
3340                let store = self.resolve_store(graph_name);
3341                store.discard_uncommitted_versions(transaction_id);
3342            }
3343        }
3344
3345        // Restore touched_graphs to only the graphs that were known at savepoint time.
3346        let mut touched = self.touched_graphs.lock();
3347        touched.clear();
3348        for gs in &sp_state.graph_snapshots {
3349            if !touched.contains(&gs.graph_name) {
3350                touched.push(gs.graph_name.clone());
3351            }
3352        }
3353
3354        Ok(())
3355    }
3356
3357    /// Releases (removes) a named savepoint without rolling back.
3358    ///
3359    /// # Errors
3360    ///
3361    /// Returns an error if no transaction is active or the savepoint does not exist.
3362    pub fn release_savepoint(&self, name: &str) -> Result<()> {
3363        let _tx_id = self.current_transaction.lock().ok_or_else(|| {
3364            grafeo_common::utils::error::Error::Transaction(
3365                grafeo_common::utils::error::TransactionError::InvalidState(
3366                    "No active transaction".to_string(),
3367                ),
3368            )
3369        })?;
3370
3371        let mut savepoints = self.savepoints.lock();
3372        let pos = savepoints
3373            .iter()
3374            .rposition(|sp| sp.name == name)
3375            .ok_or_else(|| {
3376                grafeo_common::utils::error::Error::Transaction(
3377                    grafeo_common::utils::error::TransactionError::InvalidState(format!(
3378                        "Savepoint '{name}' not found"
3379                    )),
3380                )
3381            })?;
3382        savepoints.remove(pos);
3383        Ok(())
3384    }
3385
3386    /// Returns whether a transaction is active.
3387    #[must_use]
3388    pub fn in_transaction(&self) -> bool {
3389        self.current_transaction.lock().is_some()
3390    }
3391
3392    /// Returns the current transaction ID, if any.
3393    #[must_use]
3394    pub(crate) fn current_transaction_id(&self) -> Option<TransactionId> {
3395        *self.current_transaction.lock()
3396    }
3397
3398    /// Returns a reference to the transaction manager.
3399    #[must_use]
3400    pub(crate) fn transaction_manager(&self) -> &TransactionManager {
3401        &self.transaction_manager
3402    }
3403
3404    /// Returns the store's current node count and the count at transaction start.
3405    #[must_use]
3406    pub(crate) fn node_count_delta(&self) -> (usize, usize) {
3407        (
3408            self.transaction_start_node_count.load(Ordering::Relaxed),
3409            self.active_lpg_store().node_count(),
3410        )
3411    }
3412
3413    /// Returns the store's current edge count and the count at transaction start.
3414    #[must_use]
3415    pub(crate) fn edge_count_delta(&self) -> (usize, usize) {
3416        (
3417            self.transaction_start_edge_count.load(Ordering::Relaxed),
3418            self.active_lpg_store().edge_count(),
3419        )
3420    }
3421
3422    /// Prepares the current transaction for a two-phase commit.
3423    ///
3424    /// Returns a [`PreparedCommit`](crate::transaction::PreparedCommit) that
3425    /// lets you inspect pending changes and attach metadata before finalizing.
3426    /// The mutable borrow prevents concurrent operations while the commit is
3427    /// pending.
3428    ///
3429    /// If the `PreparedCommit` is dropped without calling `commit()` or
3430    /// `abort()`, the transaction is automatically rolled back.
3431    ///
3432    /// # Errors
3433    ///
3434    /// Returns an error if no transaction is active.
3435    ///
3436    /// # Examples
3437    ///
3438    /// ```no_run
3439    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
3440    /// use grafeo_engine::GrafeoDB;
3441    ///
3442    /// let db = GrafeoDB::new_in_memory();
3443    /// let mut session = db.session();
3444    ///
3445    /// session.begin_transaction()?;
3446    /// session.execute("INSERT (:Person {name: 'Alix'})")?;
3447    ///
3448    /// let mut prepared = session.prepare_commit()?;
3449    /// println!("Nodes written: {}", prepared.info().nodes_written);
3450    /// prepared.set_metadata("audit_user", "admin");
3451    /// prepared.commit()?;
3452    /// # Ok(())
3453    /// # }
3454    /// ```
3455    pub fn prepare_commit(&mut self) -> Result<crate::transaction::PreparedCommit<'_>> {
3456        crate::transaction::PreparedCommit::new(self)
3457    }
3458
3459    /// Sets auto-commit mode.
3460    pub fn set_auto_commit(&mut self, auto_commit: bool) {
3461        self.auto_commit = auto_commit;
3462    }
3463
3464    /// Returns whether auto-commit is enabled.
3465    #[must_use]
3466    pub fn auto_commit(&self) -> bool {
3467        self.auto_commit
3468    }
3469
3470    /// Returns `true` if auto-commit should wrap this execution.
3471    ///
3472    /// Auto-commit kicks in when: the session is in auto-commit mode,
3473    /// no explicit transaction is active, and the query mutates data.
3474    fn needs_auto_commit(&self, has_mutations: bool) -> bool {
3475        self.auto_commit && has_mutations && self.current_transaction.lock().is_none()
3476    }
3477
3478    /// Wraps `body` in an automatic begin/commit when [`needs_auto_commit`]
3479    /// returns `true`. On error the transaction is rolled back.
3480    fn with_auto_commit<F>(&self, has_mutations: bool, body: F) -> Result<QueryResult>
3481    where
3482        F: FnOnce() -> Result<QueryResult>,
3483    {
3484        if self.needs_auto_commit(has_mutations) {
3485            self.begin_transaction_inner(false, None)?;
3486            match body() {
3487                Ok(result) => {
3488                    self.commit_inner()?;
3489                    Ok(result)
3490                }
3491                Err(e) => {
3492                    let _ = self.rollback_inner();
3493                    Err(e)
3494                }
3495            }
3496        } else {
3497            body()
3498        }
3499    }
3500
3501    /// Quick heuristic: returns `true` when the query text looks like it
3502    /// performs a mutation. Used by `_with_params` paths that go through the
3503    /// `QueryProcessor` (where the logical plan isn't available before
3504    /// execution). False negatives are harmless: the data just won't be
3505    /// auto-committed, which matches the prior behaviour.
3506    fn query_looks_like_mutation(query: &str) -> bool {
3507        let upper = query.to_ascii_uppercase();
3508        upper.contains("INSERT")
3509            || upper.contains("CREATE")
3510            || upper.contains("DELETE")
3511            || upper.contains("MERGE")
3512            || upper.contains("SET")
3513            || upper.contains("REMOVE")
3514            || upper.contains("DROP")
3515            || upper.contains("ALTER")
3516    }
3517
3518    /// Computes the wall-clock deadline for query execution.
3519    #[must_use]
3520    fn query_deadline(&self) -> Option<Instant> {
3521        #[cfg(not(target_arch = "wasm32"))]
3522        {
3523            self.query_timeout.map(|d| Instant::now() + d)
3524        }
3525        #[cfg(target_arch = "wasm32")]
3526        {
3527            let _ = &self.query_timeout;
3528            None
3529        }
3530    }
3531
3532    /// Evaluates a simple integer literal from a session parameter expression.
3533    fn eval_integer_literal(expr: &grafeo_adapters::query::gql::ast::Expression) -> Option<i64> {
3534        use grafeo_adapters::query::gql::ast::{Expression, Literal};
3535        match expr {
3536            Expression::Literal(Literal::Integer(n)) => Some(*n),
3537            _ => None,
3538        }
3539    }
3540
3541    /// Returns the current transaction context for MVCC visibility.
3542    ///
3543    /// Returns `(viewing_epoch, transaction_id)` where:
3544    /// - `viewing_epoch` is the epoch at which to check version visibility
3545    /// - `transaction_id` is the current transaction ID (if in a transaction)
3546    #[must_use]
3547    fn get_transaction_context(&self) -> (EpochId, Option<TransactionId>) {
3548        // Time-travel override takes precedence (read-only, no tx context)
3549        if let Some(epoch) = *self.viewing_epoch_override.lock() {
3550            return (epoch, None);
3551        }
3552
3553        if let Some(transaction_id) = *self.current_transaction.lock() {
3554            // In a transaction: use the transaction's start epoch
3555            let epoch = self
3556                .transaction_manager
3557                .start_epoch(transaction_id)
3558                .unwrap_or_else(|| self.transaction_manager.current_epoch());
3559            (epoch, Some(transaction_id))
3560        } else {
3561            // No transaction: use current epoch
3562            (self.transaction_manager.current_epoch(), None)
3563        }
3564    }
3565
3566    /// Creates a planner with transaction context and constraint validator.
3567    ///
3568    /// The `store` parameter is the graph store to plan against (use
3569    /// `self.active_store()` for graph-aware execution).
3570    fn create_planner_for_store(
3571        &self,
3572        store: Arc<dyn GraphStoreMut>,
3573        viewing_epoch: EpochId,
3574        transaction_id: Option<TransactionId>,
3575    ) -> crate::query::Planner {
3576        use crate::query::Planner;
3577        use grafeo_core::execution::operators::{LazyValue, SessionContext};
3578
3579        // Capture store reference for lazy introspection (only computed if info()/schema() called).
3580        let info_store = Arc::clone(&store);
3581        let schema_store = Arc::clone(&store);
3582
3583        let session_context = SessionContext {
3584            current_schema: self.current_schema(),
3585            current_graph: self.current_graph(),
3586            db_info: LazyValue::new(move || Self::build_info_value(&*info_store)),
3587            schema_info: LazyValue::new(move || Self::build_schema_value(&*schema_store)),
3588        };
3589
3590        let mut planner = Planner::with_context(
3591            Arc::clone(&store),
3592            Arc::clone(&self.transaction_manager),
3593            transaction_id,
3594            viewing_epoch,
3595        )
3596        .with_factorized_execution(self.factorized_execution)
3597        .with_catalog(Arc::clone(&self.catalog))
3598        .with_session_context(session_context);
3599
3600        // Attach the constraint validator for schema enforcement
3601        let validator =
3602            CatalogConstraintValidator::new(Arc::clone(&self.catalog)).with_store(store);
3603        planner = planner.with_validator(Arc::new(validator));
3604
3605        planner
3606    }
3607
3608    /// Builds a `Value::Map` for the `info()` introspection function.
3609    fn build_info_value(store: &dyn GraphStoreMut) -> Value {
3610        use grafeo_common::types::PropertyKey;
3611        use std::collections::BTreeMap;
3612
3613        let mut map = BTreeMap::new();
3614        map.insert(PropertyKey::from("mode"), Value::String("lpg".into()));
3615        map.insert(
3616            PropertyKey::from("node_count"),
3617            Value::Int64(store.node_count() as i64),
3618        );
3619        map.insert(
3620            PropertyKey::from("edge_count"),
3621            Value::Int64(store.edge_count() as i64),
3622        );
3623        map.insert(
3624            PropertyKey::from("version"),
3625            Value::String(env!("CARGO_PKG_VERSION").into()),
3626        );
3627        Value::Map(map.into())
3628    }
3629
3630    /// Builds a `Value::Map` for the `schema()` introspection function.
3631    fn build_schema_value(store: &dyn GraphStoreMut) -> Value {
3632        use grafeo_common::types::PropertyKey;
3633        use std::collections::BTreeMap;
3634
3635        let labels: Vec<Value> = store
3636            .all_labels()
3637            .into_iter()
3638            .map(|l| Value::String(l.into()))
3639            .collect();
3640        let edge_types: Vec<Value> = store
3641            .all_edge_types()
3642            .into_iter()
3643            .map(|t| Value::String(t.into()))
3644            .collect();
3645        let property_keys: Vec<Value> = store
3646            .all_property_keys()
3647            .into_iter()
3648            .map(|k| Value::String(k.into()))
3649            .collect();
3650
3651        let mut map = BTreeMap::new();
3652        map.insert(PropertyKey::from("labels"), Value::List(labels.into()));
3653        map.insert(
3654            PropertyKey::from("edge_types"),
3655            Value::List(edge_types.into()),
3656        );
3657        map.insert(
3658            PropertyKey::from("property_keys"),
3659            Value::List(property_keys.into()),
3660        );
3661        Value::Map(map.into())
3662    }
3663
3664    /// Creates a node directly (bypassing query execution).
3665    ///
3666    /// This is a low-level API for testing and direct manipulation.
3667    /// If a transaction is active, the node will be versioned with the transaction ID.
3668    pub fn create_node(&self, labels: &[&str]) -> NodeId {
3669        let (epoch, transaction_id) = self.get_transaction_context();
3670        self.active_lpg_store().create_node_versioned(
3671            labels,
3672            epoch,
3673            transaction_id.unwrap_or(TransactionId::SYSTEM),
3674        )
3675    }
3676
3677    /// Creates a node with properties.
3678    ///
3679    /// If a transaction is active, the node will be versioned with the transaction ID.
3680    pub fn create_node_with_props<'a>(
3681        &self,
3682        labels: &[&str],
3683        properties: impl IntoIterator<Item = (&'a str, Value)>,
3684    ) -> NodeId {
3685        let (epoch, transaction_id) = self.get_transaction_context();
3686        self.active_lpg_store().create_node_with_props_versioned(
3687            labels,
3688            properties,
3689            epoch,
3690            transaction_id.unwrap_or(TransactionId::SYSTEM),
3691        )
3692    }
3693
3694    /// Creates an edge between two nodes.
3695    ///
3696    /// This is a low-level API for testing and direct manipulation.
3697    /// If a transaction is active, the edge will be versioned with the transaction ID.
3698    pub fn create_edge(
3699        &self,
3700        src: NodeId,
3701        dst: NodeId,
3702        edge_type: &str,
3703    ) -> grafeo_common::types::EdgeId {
3704        let (epoch, transaction_id) = self.get_transaction_context();
3705        self.active_lpg_store().create_edge_versioned(
3706            src,
3707            dst,
3708            edge_type,
3709            epoch,
3710            transaction_id.unwrap_or(TransactionId::SYSTEM),
3711        )
3712    }
3713
3714    // =========================================================================
3715    // Direct Lookup APIs (bypass query planning for O(1) point reads)
3716    // =========================================================================
3717
3718    /// Gets a node by ID directly, bypassing query planning.
3719    ///
3720    /// This is the fastest way to retrieve a single node when you know its ID.
3721    /// Skips parsing, binding, optimization, and physical planning entirely.
3722    ///
3723    /// # Performance
3724    ///
3725    /// - Time complexity: O(1) average case
3726    /// - No lock contention (uses DashMap internally)
3727    /// - ~20-30x faster than equivalent MATCH query
3728    ///
3729    /// # Example
3730    ///
3731    /// ```no_run
3732    /// # use grafeo_engine::GrafeoDB;
3733    /// # let db = GrafeoDB::new_in_memory();
3734    /// let session = db.session();
3735    /// let node_id = session.create_node(&["Person"]);
3736    ///
3737    /// // Direct lookup - O(1), no query planning
3738    /// let node = session.get_node(node_id);
3739    /// assert!(node.is_some());
3740    /// ```
3741    #[must_use]
3742    pub fn get_node(&self, id: NodeId) -> Option<Node> {
3743        let (epoch, transaction_id) = self.get_transaction_context();
3744        self.active_lpg_store().get_node_versioned(
3745            id,
3746            epoch,
3747            transaction_id.unwrap_or(TransactionId::SYSTEM),
3748        )
3749    }
3750
3751    /// Gets a single property from a node by ID, bypassing query planning.
3752    ///
3753    /// More efficient than `get_node()` when you only need one property,
3754    /// as it avoids loading the full node with all properties.
3755    ///
3756    /// # Performance
3757    ///
3758    /// - Time complexity: O(1) average case
3759    /// - No query planning overhead
3760    ///
3761    /// # Example
3762    ///
3763    /// ```no_run
3764    /// # use grafeo_engine::GrafeoDB;
3765    /// # use grafeo_common::types::Value;
3766    /// # let db = GrafeoDB::new_in_memory();
3767    /// let session = db.session();
3768    /// let id = session.create_node_with_props(&["Person"], [("name", "Alix".into())]);
3769    ///
3770    /// // Direct property access - O(1)
3771    /// let name = session.get_node_property(id, "name");
3772    /// assert_eq!(name, Some(Value::String("Alix".into())));
3773    /// ```
3774    #[must_use]
3775    pub fn get_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
3776        self.get_node(id)
3777            .and_then(|node| node.get_property(key).cloned())
3778    }
3779
3780    /// Gets an edge by ID directly, bypassing query planning.
3781    ///
3782    /// # Performance
3783    ///
3784    /// - Time complexity: O(1) average case
3785    /// - No lock contention
3786    #[must_use]
3787    pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
3788        let (epoch, transaction_id) = self.get_transaction_context();
3789        self.active_lpg_store().get_edge_versioned(
3790            id,
3791            epoch,
3792            transaction_id.unwrap_or(TransactionId::SYSTEM),
3793        )
3794    }
3795
3796    /// Gets outgoing neighbors of a node directly, bypassing query planning.
3797    ///
3798    /// Returns (neighbor_id, edge_id) pairs for all outgoing edges.
3799    ///
3800    /// # Performance
3801    ///
3802    /// - Time complexity: O(degree) where degree is the number of outgoing edges
3803    /// - Uses adjacency index for direct access
3804    /// - ~10-20x faster than equivalent MATCH query
3805    ///
3806    /// # Example
3807    ///
3808    /// ```no_run
3809    /// # use grafeo_engine::GrafeoDB;
3810    /// # let db = GrafeoDB::new_in_memory();
3811    /// let session = db.session();
3812    /// let alix = session.create_node(&["Person"]);
3813    /// let gus = session.create_node(&["Person"]);
3814    /// session.create_edge(alix, gus, "KNOWS");
3815    ///
3816    /// // Direct neighbor lookup - O(degree)
3817    /// let neighbors = session.get_neighbors_outgoing(alix);
3818    /// assert_eq!(neighbors.len(), 1);
3819    /// assert_eq!(neighbors[0].0, gus);
3820    /// ```
3821    #[must_use]
3822    pub fn get_neighbors_outgoing(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3823        self.active_lpg_store()
3824            .edges_from(node, Direction::Outgoing)
3825            .collect()
3826    }
3827
3828    /// Gets incoming neighbors of a node directly, bypassing query planning.
3829    ///
3830    /// Returns (neighbor_id, edge_id) pairs for all incoming edges.
3831    ///
3832    /// # Performance
3833    ///
3834    /// - Time complexity: O(degree) where degree is the number of incoming edges
3835    /// - Uses backward adjacency index for direct access
3836    #[must_use]
3837    pub fn get_neighbors_incoming(&self, node: NodeId) -> Vec<(NodeId, EdgeId)> {
3838        self.active_lpg_store()
3839            .edges_from(node, Direction::Incoming)
3840            .collect()
3841    }
3842
3843    /// Gets outgoing neighbors filtered by edge type, bypassing query planning.
3844    ///
3845    /// # Example
3846    ///
3847    /// ```no_run
3848    /// # use grafeo_engine::GrafeoDB;
3849    /// # let db = GrafeoDB::new_in_memory();
3850    /// # let session = db.session();
3851    /// # let alix = session.create_node(&["Person"]);
3852    /// let neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
3853    /// ```
3854    #[must_use]
3855    pub fn get_neighbors_outgoing_by_type(
3856        &self,
3857        node: NodeId,
3858        edge_type: &str,
3859    ) -> Vec<(NodeId, EdgeId)> {
3860        self.active_lpg_store()
3861            .edges_from(node, Direction::Outgoing)
3862            .filter(|(_, edge_id)| {
3863                self.get_edge(*edge_id)
3864                    .is_some_and(|e| e.edge_type.as_str() == edge_type)
3865            })
3866            .collect()
3867    }
3868
3869    /// Checks if a node exists, bypassing query planning.
3870    ///
3871    /// # Performance
3872    ///
3873    /// - Time complexity: O(1)
3874    /// - Fastest existence check available
3875    #[must_use]
3876    pub fn node_exists(&self, id: NodeId) -> bool {
3877        self.get_node(id).is_some()
3878    }
3879
3880    /// Checks if an edge exists, bypassing query planning.
3881    #[must_use]
3882    pub fn edge_exists(&self, id: EdgeId) -> bool {
3883        self.get_edge(id).is_some()
3884    }
3885
3886    /// Gets the degree (number of edges) of a node.
3887    ///
3888    /// Returns (outgoing_degree, incoming_degree).
3889    #[must_use]
3890    pub fn get_degree(&self, node: NodeId) -> (usize, usize) {
3891        let active = self.active_lpg_store();
3892        let out = active.out_degree(node);
3893        let in_degree = active.in_degree(node);
3894        (out, in_degree)
3895    }
3896
3897    /// Batch lookup of multiple nodes by ID.
3898    ///
3899    /// More efficient than calling `get_node()` in a loop because it
3900    /// amortizes overhead.
3901    ///
3902    /// # Performance
3903    ///
3904    /// - Time complexity: O(n) where n is the number of IDs
3905    /// - Better cache utilization than individual lookups
3906    #[must_use]
3907    pub fn get_nodes_batch(&self, ids: &[NodeId]) -> Vec<Option<Node>> {
3908        let (epoch, transaction_id) = self.get_transaction_context();
3909        let tx = transaction_id.unwrap_or(TransactionId::SYSTEM);
3910        let active = self.active_lpg_store();
3911        ids.iter()
3912            .map(|&id| active.get_node_versioned(id, epoch, tx))
3913            .collect()
3914    }
3915
3916    // ── Change Data Capture ─────────────────────────────────────────────
3917
3918    /// Returns the full change history for an entity (node or edge).
3919    #[cfg(feature = "cdc")]
3920    pub fn history(
3921        &self,
3922        entity_id: impl Into<crate::cdc::EntityId>,
3923    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3924        Ok(self.cdc_log.history(entity_id.into()))
3925    }
3926
3927    /// Returns change events for an entity since the given epoch.
3928    #[cfg(feature = "cdc")]
3929    pub fn history_since(
3930        &self,
3931        entity_id: impl Into<crate::cdc::EntityId>,
3932        since_epoch: EpochId,
3933    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3934        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
3935    }
3936
3937    /// Returns all change events across all entities in an epoch range.
3938    #[cfg(feature = "cdc")]
3939    pub fn changes_between(
3940        &self,
3941        start_epoch: EpochId,
3942        end_epoch: EpochId,
3943    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
3944        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
3945    }
3946}
3947
3948impl Drop for Session {
3949    fn drop(&mut self) {
3950        // Auto-rollback any active transaction to prevent leaked MVCC state,
3951        // dangling write locks, and uncommitted versions lingering in the store.
3952        if self.in_transaction() {
3953            let _ = self.rollback_inner();
3954        }
3955    }
3956}
3957
3958#[cfg(test)]
3959mod tests {
3960    use crate::database::GrafeoDB;
3961
3962    #[test]
3963    fn test_session_create_node() {
3964        let db = GrafeoDB::new_in_memory();
3965        let session = db.session();
3966
3967        let id = session.create_node(&["Person"]);
3968        assert!(id.is_valid());
3969        assert_eq!(db.node_count(), 1);
3970    }
3971
3972    #[test]
3973    fn test_session_transaction() {
3974        let db = GrafeoDB::new_in_memory();
3975        let mut session = db.session();
3976
3977        assert!(!session.in_transaction());
3978
3979        session.begin_transaction().unwrap();
3980        assert!(session.in_transaction());
3981
3982        session.commit().unwrap();
3983        assert!(!session.in_transaction());
3984    }
3985
3986    #[test]
3987    fn test_session_transaction_context() {
3988        let db = GrafeoDB::new_in_memory();
3989        let mut session = db.session();
3990
3991        // Without transaction - context should have current epoch and no transaction_id
3992        let (_epoch1, transaction_id1) = session.get_transaction_context();
3993        assert!(transaction_id1.is_none());
3994
3995        // Start a transaction
3996        session.begin_transaction().unwrap();
3997        let (epoch2, transaction_id2) = session.get_transaction_context();
3998        assert!(transaction_id2.is_some());
3999        // Transaction should have a valid epoch
4000        let _ = epoch2; // Use the variable
4001
4002        // Commit and verify
4003        session.commit().unwrap();
4004        let (epoch3, tx_id3) = session.get_transaction_context();
4005        assert!(tx_id3.is_none());
4006        // Epoch should have advanced after commit
4007        assert!(epoch3.as_u64() >= epoch2.as_u64());
4008    }
4009
4010    #[test]
4011    fn test_session_rollback() {
4012        let db = GrafeoDB::new_in_memory();
4013        let mut session = db.session();
4014
4015        session.begin_transaction().unwrap();
4016        session.rollback().unwrap();
4017        assert!(!session.in_transaction());
4018    }
4019
4020    #[test]
4021    fn test_session_rollback_discards_versions() {
4022        use grafeo_common::types::TransactionId;
4023
4024        let db = GrafeoDB::new_in_memory();
4025
4026        // Create a node outside of any transaction (at system level)
4027        let node_before = db.store().create_node(&["Person"]);
4028        assert!(node_before.is_valid());
4029        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4030
4031        // Start a transaction
4032        let mut session = db.session();
4033        session.begin_transaction().unwrap();
4034        let transaction_id = session.current_transaction.lock().unwrap();
4035
4036        // Create a node versioned with the transaction's ID
4037        let epoch = db.store().current_epoch();
4038        let node_in_tx = db
4039            .store()
4040            .create_node_versioned(&["Person"], epoch, transaction_id);
4041        assert!(node_in_tx.is_valid());
4042
4043        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4044        // non-versioned lookups like node_count(). Verify the node is visible
4045        // only through the owning transaction.
4046        assert_eq!(
4047            db.node_count(),
4048            1,
4049            "PENDING nodes should be invisible to non-versioned node_count()"
4050        );
4051        assert!(
4052            db.store()
4053                .get_node_versioned(node_in_tx, epoch, transaction_id)
4054                .is_some(),
4055            "Transaction node should be visible to its own transaction"
4056        );
4057
4058        // Rollback the transaction
4059        session.rollback().unwrap();
4060        assert!(!session.in_transaction());
4061
4062        // The node created in the transaction should be discarded
4063        // Only the first node should remain visible
4064        let count_after = db.node_count();
4065        assert_eq!(
4066            count_after, 1,
4067            "Rollback should discard uncommitted node, but got {count_after}"
4068        );
4069
4070        // The original node should still be accessible
4071        let current_epoch = db.store().current_epoch();
4072        assert!(
4073            db.store()
4074                .get_node_versioned(node_before, current_epoch, TransactionId::SYSTEM)
4075                .is_some(),
4076            "Original node should still exist"
4077        );
4078
4079        // The node created in the transaction should not be accessible
4080        assert!(
4081            db.store()
4082                .get_node_versioned(node_in_tx, current_epoch, TransactionId::SYSTEM)
4083                .is_none(),
4084            "Transaction node should be gone"
4085        );
4086    }
4087
4088    #[test]
4089    fn test_session_create_node_in_transaction() {
4090        // Test that session.create_node() is transaction-aware
4091        let db = GrafeoDB::new_in_memory();
4092
4093        // Create a node outside of any transaction
4094        let node_before = db.create_node(&["Person"]);
4095        assert!(node_before.is_valid());
4096        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4097
4098        // Start a transaction and create a node through the session
4099        let mut session = db.session();
4100        session.begin_transaction().unwrap();
4101        let transaction_id = session.current_transaction.lock().unwrap();
4102
4103        // Create a node through session.create_node() - should be versioned with tx
4104        let node_in_tx = session.create_node(&["Person"]);
4105        assert!(node_in_tx.is_valid());
4106
4107        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4108        // non-versioned lookups. Verify the node is visible only to its own tx.
4109        assert_eq!(
4110            db.node_count(),
4111            1,
4112            "PENDING nodes should be invisible to non-versioned node_count()"
4113        );
4114        let epoch = db.store().current_epoch();
4115        assert!(
4116            db.store()
4117                .get_node_versioned(node_in_tx, epoch, transaction_id)
4118                .is_some(),
4119            "Transaction node should be visible to its own transaction"
4120        );
4121
4122        // Rollback the transaction
4123        session.rollback().unwrap();
4124
4125        // The node created via session.create_node() should be discarded
4126        let count_after = db.node_count();
4127        assert_eq!(
4128            count_after, 1,
4129            "Rollback should discard node created via session.create_node(), but got {count_after}"
4130        );
4131    }
4132
4133    #[test]
4134    fn test_session_create_node_with_props_in_transaction() {
4135        use grafeo_common::types::Value;
4136
4137        // Test that session.create_node_with_props() is transaction-aware
4138        let db = GrafeoDB::new_in_memory();
4139
4140        // Create a node outside of any transaction
4141        db.create_node(&["Person"]);
4142        assert_eq!(db.node_count(), 1, "Should have 1 node before transaction");
4143
4144        // Start a transaction and create a node with properties
4145        let mut session = db.session();
4146        session.begin_transaction().unwrap();
4147        let transaction_id = session.current_transaction.lock().unwrap();
4148
4149        let node_in_tx =
4150            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4151        assert!(node_in_tx.is_valid());
4152
4153        // Uncommitted nodes use EpochId::PENDING, so they are invisible to
4154        // non-versioned lookups. Verify the node is visible only to its own tx.
4155        assert_eq!(
4156            db.node_count(),
4157            1,
4158            "PENDING nodes should be invisible to non-versioned node_count()"
4159        );
4160        let epoch = db.store().current_epoch();
4161        assert!(
4162            db.store()
4163                .get_node_versioned(node_in_tx, epoch, transaction_id)
4164                .is_some(),
4165            "Transaction node should be visible to its own transaction"
4166        );
4167
4168        // Rollback the transaction
4169        session.rollback().unwrap();
4170
4171        // The node should be discarded
4172        let count_after = db.node_count();
4173        assert_eq!(
4174            count_after, 1,
4175            "Rollback should discard node created via session.create_node_with_props()"
4176        );
4177    }
4178
4179    #[cfg(feature = "gql")]
4180    mod gql_tests {
4181        use super::*;
4182
4183        #[test]
4184        fn test_gql_query_execution() {
4185            let db = GrafeoDB::new_in_memory();
4186            let session = db.session();
4187
4188            // Create some test data
4189            session.create_node(&["Person"]);
4190            session.create_node(&["Person"]);
4191            session.create_node(&["Animal"]);
4192
4193            // Execute a GQL query
4194            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4195
4196            // Should return 2 Person nodes
4197            assert_eq!(result.row_count(), 2);
4198            assert_eq!(result.column_count(), 1);
4199            assert_eq!(result.columns[0], "n");
4200        }
4201
4202        #[test]
4203        fn test_gql_empty_result() {
4204            let db = GrafeoDB::new_in_memory();
4205            let session = db.session();
4206
4207            // No data in database
4208            let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
4209
4210            assert_eq!(result.row_count(), 0);
4211        }
4212
4213        #[test]
4214        fn test_gql_parse_error() {
4215            let db = GrafeoDB::new_in_memory();
4216            let session = db.session();
4217
4218            // Invalid GQL syntax
4219            let result = session.execute("MATCH (n RETURN n");
4220
4221            assert!(result.is_err());
4222        }
4223
4224        #[test]
4225        fn test_gql_relationship_traversal() {
4226            let db = GrafeoDB::new_in_memory();
4227            let session = db.session();
4228
4229            // Create a graph: Alix -> Gus, Alix -> Vincent
4230            let alix = session.create_node(&["Person"]);
4231            let gus = session.create_node(&["Person"]);
4232            let vincent = session.create_node(&["Person"]);
4233
4234            session.create_edge(alix, gus, "KNOWS");
4235            session.create_edge(alix, vincent, "KNOWS");
4236
4237            // Execute a path query: MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b
4238            let result = session
4239                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4240                .unwrap();
4241
4242            // Should return 2 rows (Alix->Gus, Alix->Vincent)
4243            assert_eq!(result.row_count(), 2);
4244            assert_eq!(result.column_count(), 2);
4245            assert_eq!(result.columns[0], "a");
4246            assert_eq!(result.columns[1], "b");
4247        }
4248
4249        #[test]
4250        fn test_gql_relationship_with_type_filter() {
4251            let db = GrafeoDB::new_in_memory();
4252            let session = db.session();
4253
4254            // Create a graph: Alix -KNOWS-> Gus, Alix -WORKS_WITH-> Vincent
4255            let alix = session.create_node(&["Person"]);
4256            let gus = session.create_node(&["Person"]);
4257            let vincent = session.create_node(&["Person"]);
4258
4259            session.create_edge(alix, gus, "KNOWS");
4260            session.create_edge(alix, vincent, "WORKS_WITH");
4261
4262            // Query only KNOWS relationships
4263            let result = session
4264                .execute("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a, b")
4265                .unwrap();
4266
4267            // Should return only 1 row (Alix->Gus)
4268            assert_eq!(result.row_count(), 1);
4269        }
4270
4271        #[test]
4272        fn test_gql_semantic_error_undefined_variable() {
4273            let db = GrafeoDB::new_in_memory();
4274            let session = db.session();
4275
4276            // Reference undefined variable 'x' in RETURN
4277            let result = session.execute("MATCH (n:Person) RETURN x");
4278
4279            // Should fail with semantic error
4280            assert!(result.is_err());
4281            let Err(err) = result else {
4282                panic!("Expected error")
4283            };
4284            assert!(
4285                err.to_string().contains("Undefined variable"),
4286                "Expected undefined variable error, got: {}",
4287                err
4288            );
4289        }
4290
4291        #[test]
4292        fn test_gql_where_clause_property_filter() {
4293            use grafeo_common::types::Value;
4294
4295            let db = GrafeoDB::new_in_memory();
4296            let session = db.session();
4297
4298            // Create people with ages
4299            session.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
4300            session.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
4301            session.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
4302
4303            // Query with WHERE clause: age > 30
4304            let result = session
4305                .execute("MATCH (n:Person) WHERE n.age > 30 RETURN n")
4306                .unwrap();
4307
4308            // Should return 2 people (ages 35 and 45)
4309            assert_eq!(result.row_count(), 2);
4310        }
4311
4312        #[test]
4313        fn test_gql_where_clause_equality() {
4314            use grafeo_common::types::Value;
4315
4316            let db = GrafeoDB::new_in_memory();
4317            let session = db.session();
4318
4319            // Create people with names
4320            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4321            session.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
4322            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4323
4324            // Query with WHERE clause: name = "Alix"
4325            let result = session
4326                .execute("MATCH (n:Person) WHERE n.name = \"Alix\" RETURN n")
4327                .unwrap();
4328
4329            // Should return 2 people named Alix
4330            assert_eq!(result.row_count(), 2);
4331        }
4332
4333        #[test]
4334        fn test_gql_return_property_access() {
4335            use grafeo_common::types::Value;
4336
4337            let db = GrafeoDB::new_in_memory();
4338            let session = db.session();
4339
4340            // Create people with names and ages
4341            session.create_node_with_props(
4342                &["Person"],
4343                [
4344                    ("name", Value::String("Alix".into())),
4345                    ("age", Value::Int64(30)),
4346                ],
4347            );
4348            session.create_node_with_props(
4349                &["Person"],
4350                [
4351                    ("name", Value::String("Gus".into())),
4352                    ("age", Value::Int64(25)),
4353                ],
4354            );
4355
4356            // Query returning properties
4357            let result = session
4358                .execute("MATCH (n:Person) RETURN n.name, n.age")
4359                .unwrap();
4360
4361            // Should return 2 rows with name and age columns
4362            assert_eq!(result.row_count(), 2);
4363            assert_eq!(result.column_count(), 2);
4364            assert_eq!(result.columns[0], "n.name");
4365            assert_eq!(result.columns[1], "n.age");
4366
4367            // Check that we get actual values
4368            let names: Vec<&Value> = result.rows.iter().map(|r| &r[0]).collect();
4369            assert!(names.contains(&&Value::String("Alix".into())));
4370            assert!(names.contains(&&Value::String("Gus".into())));
4371        }
4372
4373        #[test]
4374        fn test_gql_return_mixed_expressions() {
4375            use grafeo_common::types::Value;
4376
4377            let db = GrafeoDB::new_in_memory();
4378            let session = db.session();
4379
4380            // Create a person
4381            session.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4382
4383            // Query returning both node and property
4384            let result = session
4385                .execute("MATCH (n:Person) RETURN n, n.name")
4386                .unwrap();
4387
4388            assert_eq!(result.row_count(), 1);
4389            assert_eq!(result.column_count(), 2);
4390            assert_eq!(result.columns[0], "n");
4391            assert_eq!(result.columns[1], "n.name");
4392
4393            // Second column should be the name
4394            assert_eq!(result.rows[0][1], Value::String("Alix".into()));
4395        }
4396    }
4397
4398    #[cfg(feature = "cypher")]
4399    mod cypher_tests {
4400        use super::*;
4401
4402        #[test]
4403        fn test_cypher_query_execution() {
4404            let db = GrafeoDB::new_in_memory();
4405            let session = db.session();
4406
4407            // Create some test data
4408            session.create_node(&["Person"]);
4409            session.create_node(&["Person"]);
4410            session.create_node(&["Animal"]);
4411
4412            // Execute a Cypher query
4413            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4414
4415            // Should return 2 Person nodes
4416            assert_eq!(result.row_count(), 2);
4417            assert_eq!(result.column_count(), 1);
4418            assert_eq!(result.columns[0], "n");
4419        }
4420
4421        #[test]
4422        fn test_cypher_empty_result() {
4423            let db = GrafeoDB::new_in_memory();
4424            let session = db.session();
4425
4426            // No data in database
4427            let result = session.execute_cypher("MATCH (n:Person) RETURN n").unwrap();
4428
4429            assert_eq!(result.row_count(), 0);
4430        }
4431
4432        #[test]
4433        fn test_cypher_parse_error() {
4434            let db = GrafeoDB::new_in_memory();
4435            let session = db.session();
4436
4437            // Invalid Cypher syntax
4438            let result = session.execute_cypher("MATCH (n RETURN n");
4439
4440            assert!(result.is_err());
4441        }
4442    }
4443
4444    // ==================== Direct Lookup API Tests ====================
4445
4446    mod direct_lookup_tests {
4447        use super::*;
4448        use grafeo_common::types::Value;
4449
4450        #[test]
4451        fn test_get_node() {
4452            let db = GrafeoDB::new_in_memory();
4453            let session = db.session();
4454
4455            let id = session.create_node(&["Person"]);
4456            let node = session.get_node(id);
4457
4458            assert!(node.is_some());
4459            let node = node.unwrap();
4460            assert_eq!(node.id, id);
4461        }
4462
4463        #[test]
4464        fn test_get_node_not_found() {
4465            use grafeo_common::types::NodeId;
4466
4467            let db = GrafeoDB::new_in_memory();
4468            let session = db.session();
4469
4470            // Try to get a non-existent node
4471            let node = session.get_node(NodeId::new(9999));
4472            assert!(node.is_none());
4473        }
4474
4475        #[test]
4476        fn test_get_node_property() {
4477            let db = GrafeoDB::new_in_memory();
4478            let session = db.session();
4479
4480            let id = session
4481                .create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
4482
4483            let name = session.get_node_property(id, "name");
4484            assert_eq!(name, Some(Value::String("Alix".into())));
4485
4486            // Non-existent property
4487            let missing = session.get_node_property(id, "missing");
4488            assert!(missing.is_none());
4489        }
4490
4491        #[test]
4492        fn test_get_edge() {
4493            let db = GrafeoDB::new_in_memory();
4494            let session = db.session();
4495
4496            let alix = session.create_node(&["Person"]);
4497            let gus = session.create_node(&["Person"]);
4498            let edge_id = session.create_edge(alix, gus, "KNOWS");
4499
4500            let edge = session.get_edge(edge_id);
4501            assert!(edge.is_some());
4502            let edge = edge.unwrap();
4503            assert_eq!(edge.id, edge_id);
4504            assert_eq!(edge.src, alix);
4505            assert_eq!(edge.dst, gus);
4506        }
4507
4508        #[test]
4509        fn test_get_edge_not_found() {
4510            use grafeo_common::types::EdgeId;
4511
4512            let db = GrafeoDB::new_in_memory();
4513            let session = db.session();
4514
4515            let edge = session.get_edge(EdgeId::new(9999));
4516            assert!(edge.is_none());
4517        }
4518
4519        #[test]
4520        fn test_get_neighbors_outgoing() {
4521            let db = GrafeoDB::new_in_memory();
4522            let session = db.session();
4523
4524            let alix = session.create_node(&["Person"]);
4525            let gus = session.create_node(&["Person"]);
4526            let harm = session.create_node(&["Person"]);
4527
4528            session.create_edge(alix, gus, "KNOWS");
4529            session.create_edge(alix, harm, "KNOWS");
4530
4531            let neighbors = session.get_neighbors_outgoing(alix);
4532            assert_eq!(neighbors.len(), 2);
4533
4534            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4535            assert!(neighbor_ids.contains(&gus));
4536            assert!(neighbor_ids.contains(&harm));
4537        }
4538
4539        #[test]
4540        fn test_get_neighbors_incoming() {
4541            let db = GrafeoDB::new_in_memory();
4542            let session = db.session();
4543
4544            let alix = session.create_node(&["Person"]);
4545            let gus = session.create_node(&["Person"]);
4546            let harm = session.create_node(&["Person"]);
4547
4548            session.create_edge(gus, alix, "KNOWS");
4549            session.create_edge(harm, alix, "KNOWS");
4550
4551            let neighbors = session.get_neighbors_incoming(alix);
4552            assert_eq!(neighbors.len(), 2);
4553
4554            let neighbor_ids: Vec<_> = neighbors.iter().map(|(node_id, _)| *node_id).collect();
4555            assert!(neighbor_ids.contains(&gus));
4556            assert!(neighbor_ids.contains(&harm));
4557        }
4558
4559        #[test]
4560        fn test_get_neighbors_outgoing_by_type() {
4561            let db = GrafeoDB::new_in_memory();
4562            let session = db.session();
4563
4564            let alix = session.create_node(&["Person"]);
4565            let gus = session.create_node(&["Person"]);
4566            let company = session.create_node(&["Company"]);
4567
4568            session.create_edge(alix, gus, "KNOWS");
4569            session.create_edge(alix, company, "WORKS_AT");
4570
4571            let knows_neighbors = session.get_neighbors_outgoing_by_type(alix, "KNOWS");
4572            assert_eq!(knows_neighbors.len(), 1);
4573            assert_eq!(knows_neighbors[0].0, gus);
4574
4575            let works_neighbors = session.get_neighbors_outgoing_by_type(alix, "WORKS_AT");
4576            assert_eq!(works_neighbors.len(), 1);
4577            assert_eq!(works_neighbors[0].0, company);
4578
4579            // No edges of this type
4580            let no_neighbors = session.get_neighbors_outgoing_by_type(alix, "LIKES");
4581            assert!(no_neighbors.is_empty());
4582        }
4583
4584        #[test]
4585        fn test_node_exists() {
4586            use grafeo_common::types::NodeId;
4587
4588            let db = GrafeoDB::new_in_memory();
4589            let session = db.session();
4590
4591            let id = session.create_node(&["Person"]);
4592
4593            assert!(session.node_exists(id));
4594            assert!(!session.node_exists(NodeId::new(9999)));
4595        }
4596
4597        #[test]
4598        fn test_edge_exists() {
4599            use grafeo_common::types::EdgeId;
4600
4601            let db = GrafeoDB::new_in_memory();
4602            let session = db.session();
4603
4604            let alix = session.create_node(&["Person"]);
4605            let gus = session.create_node(&["Person"]);
4606            let edge_id = session.create_edge(alix, gus, "KNOWS");
4607
4608            assert!(session.edge_exists(edge_id));
4609            assert!(!session.edge_exists(EdgeId::new(9999)));
4610        }
4611
4612        #[test]
4613        fn test_get_degree() {
4614            let db = GrafeoDB::new_in_memory();
4615            let session = db.session();
4616
4617            let alix = session.create_node(&["Person"]);
4618            let gus = session.create_node(&["Person"]);
4619            let harm = session.create_node(&["Person"]);
4620
4621            // Alix knows Gus and Harm (2 outgoing)
4622            session.create_edge(alix, gus, "KNOWS");
4623            session.create_edge(alix, harm, "KNOWS");
4624            // Gus knows Alix (1 incoming for Alix)
4625            session.create_edge(gus, alix, "KNOWS");
4626
4627            let (out_degree, in_degree) = session.get_degree(alix);
4628            assert_eq!(out_degree, 2);
4629            assert_eq!(in_degree, 1);
4630
4631            // Node with no edges
4632            let lonely = session.create_node(&["Person"]);
4633            let (out, in_deg) = session.get_degree(lonely);
4634            assert_eq!(out, 0);
4635            assert_eq!(in_deg, 0);
4636        }
4637
4638        #[test]
4639        fn test_get_nodes_batch() {
4640            let db = GrafeoDB::new_in_memory();
4641            let session = db.session();
4642
4643            let alix = session.create_node(&["Person"]);
4644            let gus = session.create_node(&["Person"]);
4645            let harm = session.create_node(&["Person"]);
4646
4647            let nodes = session.get_nodes_batch(&[alix, gus, harm]);
4648            assert_eq!(nodes.len(), 3);
4649            assert!(nodes[0].is_some());
4650            assert!(nodes[1].is_some());
4651            assert!(nodes[2].is_some());
4652
4653            // With non-existent node
4654            use grafeo_common::types::NodeId;
4655            let nodes_with_missing = session.get_nodes_batch(&[alix, NodeId::new(9999), harm]);
4656            assert_eq!(nodes_with_missing.len(), 3);
4657            assert!(nodes_with_missing[0].is_some());
4658            assert!(nodes_with_missing[1].is_none()); // Missing node
4659            assert!(nodes_with_missing[2].is_some());
4660        }
4661
4662        #[test]
4663        fn test_auto_commit_setting() {
4664            let db = GrafeoDB::new_in_memory();
4665            let mut session = db.session();
4666
4667            // Default is auto-commit enabled
4668            assert!(session.auto_commit());
4669
4670            session.set_auto_commit(false);
4671            assert!(!session.auto_commit());
4672
4673            session.set_auto_commit(true);
4674            assert!(session.auto_commit());
4675        }
4676
4677        #[test]
4678        fn test_transaction_double_begin_nests() {
4679            let db = GrafeoDB::new_in_memory();
4680            let mut session = db.session();
4681
4682            session.begin_transaction().unwrap();
4683            // Second begin_transaction creates a nested transaction (auto-savepoint)
4684            let result = session.begin_transaction();
4685            assert!(result.is_ok());
4686            // Commit the inner (releases savepoint)
4687            session.commit().unwrap();
4688            // Commit the outer
4689            session.commit().unwrap();
4690        }
4691
4692        #[test]
4693        fn test_commit_without_transaction_error() {
4694            let db = GrafeoDB::new_in_memory();
4695            let mut session = db.session();
4696
4697            let result = session.commit();
4698            assert!(result.is_err());
4699        }
4700
4701        #[test]
4702        fn test_rollback_without_transaction_error() {
4703            let db = GrafeoDB::new_in_memory();
4704            let mut session = db.session();
4705
4706            let result = session.rollback();
4707            assert!(result.is_err());
4708        }
4709
4710        #[test]
4711        fn test_create_edge_in_transaction() {
4712            let db = GrafeoDB::new_in_memory();
4713            let mut session = db.session();
4714
4715            // Create nodes outside transaction
4716            let alix = session.create_node(&["Person"]);
4717            let gus = session.create_node(&["Person"]);
4718
4719            // Create edge in transaction
4720            session.begin_transaction().unwrap();
4721            let edge_id = session.create_edge(alix, gus, "KNOWS");
4722
4723            // Edge should be visible in the transaction
4724            assert!(session.edge_exists(edge_id));
4725
4726            // Commit
4727            session.commit().unwrap();
4728
4729            // Edge should still be visible
4730            assert!(session.edge_exists(edge_id));
4731        }
4732
4733        #[test]
4734        fn test_neighbors_empty_node() {
4735            let db = GrafeoDB::new_in_memory();
4736            let session = db.session();
4737
4738            let lonely = session.create_node(&["Person"]);
4739
4740            assert!(session.get_neighbors_outgoing(lonely).is_empty());
4741            assert!(session.get_neighbors_incoming(lonely).is_empty());
4742            assert!(
4743                session
4744                    .get_neighbors_outgoing_by_type(lonely, "KNOWS")
4745                    .is_empty()
4746            );
4747        }
4748    }
4749
4750    #[test]
4751    fn test_auto_gc_triggers_on_commit_interval() {
4752        use crate::config::Config;
4753
4754        let config = Config::in_memory().with_gc_interval(2);
4755        let db = GrafeoDB::with_config(config).unwrap();
4756        let mut session = db.session();
4757
4758        // First commit: counter = 1, no GC (not a multiple of 2)
4759        session.begin_transaction().unwrap();
4760        session.create_node(&["A"]);
4761        session.commit().unwrap();
4762
4763        // Second commit: counter = 2, GC should trigger (multiple of 2)
4764        session.begin_transaction().unwrap();
4765        session.create_node(&["B"]);
4766        session.commit().unwrap();
4767
4768        // Verify the database is still functional after GC
4769        assert_eq!(db.node_count(), 2);
4770    }
4771
4772    #[test]
4773    fn test_query_timeout_config_propagates_to_session() {
4774        use crate::config::Config;
4775        use std::time::Duration;
4776
4777        let config = Config::in_memory().with_query_timeout(Duration::from_secs(5));
4778        let db = GrafeoDB::with_config(config).unwrap();
4779        let session = db.session();
4780
4781        // Verify the session has a query deadline (timeout was set)
4782        assert!(session.query_deadline().is_some());
4783    }
4784
4785    #[test]
4786    fn test_no_query_timeout_returns_no_deadline() {
4787        let db = GrafeoDB::new_in_memory();
4788        let session = db.session();
4789
4790        // Default config has no timeout
4791        assert!(session.query_deadline().is_none());
4792    }
4793
4794    #[test]
4795    fn test_graph_model_accessor() {
4796        use crate::config::GraphModel;
4797
4798        let db = GrafeoDB::new_in_memory();
4799        let session = db.session();
4800
4801        assert_eq!(session.graph_model(), GraphModel::Lpg);
4802    }
4803
4804    #[cfg(feature = "gql")]
4805    #[test]
4806    fn test_external_store_session() {
4807        use grafeo_core::graph::GraphStoreMut;
4808        use std::sync::Arc;
4809
4810        let config = crate::config::Config::in_memory();
4811        let store =
4812            Arc::new(grafeo_core::graph::lpg::LpgStore::new().unwrap()) as Arc<dyn GraphStoreMut>;
4813        let db = GrafeoDB::with_store(store, config).unwrap();
4814
4815        let mut session = db.session();
4816
4817        // Use an explicit transaction so that INSERT and MATCH share the same
4818        // transaction context. With PENDING epochs, uncommitted versions are
4819        // only visible to the owning transaction.
4820        session.begin_transaction().unwrap();
4821        session.execute("INSERT (:Test {name: 'hello'})").unwrap();
4822
4823        // Verify we can query through it within the same transaction
4824        let result = session.execute("MATCH (n:Test) RETURN n.name").unwrap();
4825        assert_eq!(result.row_count(), 1);
4826
4827        session.commit().unwrap();
4828    }
4829
4830    // ==================== Session Command Tests ====================
4831
4832    #[cfg(feature = "gql")]
4833    mod session_command_tests {
4834        use super::*;
4835        use grafeo_common::types::Value;
4836
4837        #[test]
4838        fn test_use_graph_sets_current_graph() {
4839            let db = GrafeoDB::new_in_memory();
4840            let session = db.session();
4841
4842            // Create the graph first, then USE it
4843            session.execute("CREATE GRAPH mydb").unwrap();
4844            session.execute("USE GRAPH mydb").unwrap();
4845
4846            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4847        }
4848
4849        #[test]
4850        fn test_use_graph_nonexistent_errors() {
4851            let db = GrafeoDB::new_in_memory();
4852            let session = db.session();
4853
4854            let result = session.execute("USE GRAPH doesnotexist");
4855            assert!(result.is_err());
4856            let err = result.unwrap_err().to_string();
4857            assert!(
4858                err.contains("does not exist"),
4859                "Expected 'does not exist' error, got: {err}"
4860            );
4861        }
4862
4863        #[test]
4864        fn test_use_graph_default_always_valid() {
4865            let db = GrafeoDB::new_in_memory();
4866            let session = db.session();
4867
4868            // "default" is always valid, even without CREATE GRAPH
4869            session.execute("USE GRAPH default").unwrap();
4870            assert_eq!(session.current_graph(), Some("default".to_string()));
4871        }
4872
4873        #[test]
4874        fn test_session_set_graph() {
4875            let db = GrafeoDB::new_in_memory();
4876            let session = db.session();
4877
4878            session.execute("CREATE GRAPH analytics").unwrap();
4879            session.execute("SESSION SET GRAPH analytics").unwrap();
4880            assert_eq!(session.current_graph(), Some("analytics".to_string()));
4881        }
4882
4883        #[test]
4884        fn test_session_set_graph_nonexistent_errors() {
4885            let db = GrafeoDB::new_in_memory();
4886            let session = db.session();
4887
4888            let result = session.execute("SESSION SET GRAPH nosuchgraph");
4889            assert!(result.is_err());
4890        }
4891
4892        #[test]
4893        fn test_session_set_time_zone() {
4894            let db = GrafeoDB::new_in_memory();
4895            let session = db.session();
4896
4897            assert_eq!(session.time_zone(), None);
4898
4899            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4900            assert_eq!(session.time_zone(), Some("UTC".to_string()));
4901
4902            session
4903                .execute("SESSION SET TIME ZONE 'America/New_York'")
4904                .unwrap();
4905            assert_eq!(session.time_zone(), Some("America/New_York".to_string()));
4906        }
4907
4908        #[test]
4909        fn test_session_set_parameter() {
4910            let db = GrafeoDB::new_in_memory();
4911            let session = db.session();
4912
4913            session
4914                .execute("SESSION SET PARAMETER $timeout = 30")
4915                .unwrap();
4916
4917            // Parameter is stored (value is Null for now, since expression
4918            // evaluation is not yet wired up)
4919            assert!(session.get_parameter("timeout").is_some());
4920        }
4921
4922        #[test]
4923        fn test_session_reset_clears_all_state() {
4924            let db = GrafeoDB::new_in_memory();
4925            let session = db.session();
4926
4927            // Set various session state
4928            session.execute("CREATE GRAPH analytics").unwrap();
4929            session.execute("SESSION SET GRAPH analytics").unwrap();
4930            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4931            session
4932                .execute("SESSION SET PARAMETER $limit = 100")
4933                .unwrap();
4934
4935            // Verify state was set
4936            assert!(session.current_graph().is_some());
4937            assert!(session.time_zone().is_some());
4938            assert!(session.get_parameter("limit").is_some());
4939
4940            // Reset everything
4941            session.execute("SESSION RESET").unwrap();
4942
4943            assert_eq!(session.current_graph(), None);
4944            assert_eq!(session.time_zone(), None);
4945            assert!(session.get_parameter("limit").is_none());
4946        }
4947
4948        #[test]
4949        fn test_session_close_clears_state() {
4950            let db = GrafeoDB::new_in_memory();
4951            let session = db.session();
4952
4953            session.execute("CREATE GRAPH analytics").unwrap();
4954            session.execute("SESSION SET GRAPH analytics").unwrap();
4955            session.execute("SESSION SET TIME ZONE 'UTC'").unwrap();
4956
4957            session.execute("SESSION CLOSE").unwrap();
4958
4959            assert_eq!(session.current_graph(), None);
4960            assert_eq!(session.time_zone(), None);
4961        }
4962
4963        #[test]
4964        fn test_create_graph() {
4965            let db = GrafeoDB::new_in_memory();
4966            let session = db.session();
4967
4968            session.execute("CREATE GRAPH mydb").unwrap();
4969
4970            // Should be able to USE it now
4971            session.execute("USE GRAPH mydb").unwrap();
4972            assert_eq!(session.current_graph(), Some("mydb".to_string()));
4973        }
4974
4975        #[test]
4976        fn test_create_graph_duplicate_errors() {
4977            let db = GrafeoDB::new_in_memory();
4978            let session = db.session();
4979
4980            session.execute("CREATE GRAPH mydb").unwrap();
4981            let result = session.execute("CREATE GRAPH mydb");
4982
4983            assert!(result.is_err());
4984            let err = result.unwrap_err().to_string();
4985            assert!(
4986                err.contains("already exists"),
4987                "Expected 'already exists' error, got: {err}"
4988            );
4989        }
4990
4991        #[test]
4992        fn test_create_graph_if_not_exists() {
4993            let db = GrafeoDB::new_in_memory();
4994            let session = db.session();
4995
4996            session.execute("CREATE GRAPH mydb").unwrap();
4997            // Should succeed silently with IF NOT EXISTS
4998            session.execute("CREATE GRAPH IF NOT EXISTS mydb").unwrap();
4999        }
5000
5001        #[test]
5002        fn test_drop_graph() {
5003            let db = GrafeoDB::new_in_memory();
5004            let session = db.session();
5005
5006            session.execute("CREATE GRAPH mydb").unwrap();
5007            session.execute("DROP GRAPH mydb").unwrap();
5008
5009            // Should no longer be usable
5010            let result = session.execute("USE GRAPH mydb");
5011            assert!(result.is_err());
5012        }
5013
5014        #[test]
5015        fn test_drop_graph_nonexistent_errors() {
5016            let db = GrafeoDB::new_in_memory();
5017            let session = db.session();
5018
5019            let result = session.execute("DROP GRAPH nosuchgraph");
5020            assert!(result.is_err());
5021            let err = result.unwrap_err().to_string();
5022            assert!(
5023                err.contains("does not exist"),
5024                "Expected 'does not exist' error, got: {err}"
5025            );
5026        }
5027
5028        #[test]
5029        fn test_drop_graph_if_exists() {
5030            let db = GrafeoDB::new_in_memory();
5031            let session = db.session();
5032
5033            // Should succeed silently with IF EXISTS
5034            session.execute("DROP GRAPH IF EXISTS nosuchgraph").unwrap();
5035        }
5036
5037        #[test]
5038        fn test_start_transaction_via_gql() {
5039            let db = GrafeoDB::new_in_memory();
5040            let session = db.session();
5041
5042            session.execute("START TRANSACTION").unwrap();
5043            assert!(session.in_transaction());
5044            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5045            session.execute("COMMIT").unwrap();
5046            assert!(!session.in_transaction());
5047
5048            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5049            assert_eq!(result.rows.len(), 1);
5050        }
5051
5052        #[test]
5053        fn test_start_transaction_read_only_blocks_insert() {
5054            let db = GrafeoDB::new_in_memory();
5055            let session = db.session();
5056
5057            session.execute("START TRANSACTION READ ONLY").unwrap();
5058            let result = session.execute("INSERT (:Person {name: 'Alix'})");
5059            assert!(result.is_err());
5060            let err = result.unwrap_err().to_string();
5061            assert!(
5062                err.contains("read-only"),
5063                "Expected read-only error, got: {err}"
5064            );
5065            session.execute("ROLLBACK").unwrap();
5066        }
5067
5068        #[test]
5069        fn test_start_transaction_read_only_allows_reads() {
5070            let db = GrafeoDB::new_in_memory();
5071            let mut session = db.session();
5072            session.begin_transaction().unwrap();
5073            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5074            session.commit().unwrap();
5075
5076            session.execute("START TRANSACTION READ ONLY").unwrap();
5077            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5078            assert_eq!(result.rows.len(), 1);
5079            session.execute("COMMIT").unwrap();
5080        }
5081
5082        #[test]
5083        fn test_rollback_via_gql() {
5084            let db = GrafeoDB::new_in_memory();
5085            let session = db.session();
5086
5087            session.execute("START TRANSACTION").unwrap();
5088            session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
5089            session.execute("ROLLBACK").unwrap();
5090
5091            let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
5092            assert!(result.rows.is_empty());
5093        }
5094
5095        #[test]
5096        fn test_start_transaction_with_isolation_level() {
5097            let db = GrafeoDB::new_in_memory();
5098            let session = db.session();
5099
5100            session
5101                .execute("START TRANSACTION ISOLATION LEVEL SERIALIZABLE")
5102                .unwrap();
5103            assert!(session.in_transaction());
5104            session.execute("ROLLBACK").unwrap();
5105        }
5106
5107        #[test]
5108        fn test_session_commands_return_empty_result() {
5109            let db = GrafeoDB::new_in_memory();
5110            let session = db.session();
5111
5112            session.execute("CREATE GRAPH test").unwrap();
5113            let result = session.execute("SESSION SET GRAPH test").unwrap();
5114            assert_eq!(result.row_count(), 0);
5115            assert_eq!(result.column_count(), 0);
5116        }
5117
5118        #[test]
5119        fn test_current_graph_default_is_none() {
5120            let db = GrafeoDB::new_in_memory();
5121            let session = db.session();
5122
5123            assert_eq!(session.current_graph(), None);
5124        }
5125
5126        #[test]
5127        fn test_time_zone_default_is_none() {
5128            let db = GrafeoDB::new_in_memory();
5129            let session = db.session();
5130
5131            assert_eq!(session.time_zone(), None);
5132        }
5133
5134        #[test]
5135        fn test_session_state_independent_across_sessions() {
5136            let db = GrafeoDB::new_in_memory();
5137            let session1 = db.session();
5138            let session2 = db.session();
5139
5140            session1.execute("CREATE GRAPH first").unwrap();
5141            session1.execute("CREATE GRAPH second").unwrap();
5142            session1.execute("SESSION SET GRAPH first").unwrap();
5143            session2.execute("SESSION SET GRAPH second").unwrap();
5144
5145            assert_eq!(session1.current_graph(), Some("first".to_string()));
5146            assert_eq!(session2.current_graph(), Some("second".to_string()));
5147        }
5148
5149        #[test]
5150        fn test_show_node_types() {
5151            let db = GrafeoDB::new_in_memory();
5152            let session = db.session();
5153
5154            session
5155                .execute("CREATE NODE TYPE Person (name STRING NOT NULL, age INTEGER)")
5156                .unwrap();
5157
5158            let result = session.execute("SHOW NODE TYPES").unwrap();
5159            assert_eq!(
5160                result.columns,
5161                vec!["name", "properties", "constraints", "parents"]
5162            );
5163            assert_eq!(result.rows.len(), 1);
5164            // First column is the type name
5165            assert_eq!(result.rows[0][0], Value::from("Person"));
5166        }
5167
5168        #[test]
5169        fn test_show_edge_types() {
5170            let db = GrafeoDB::new_in_memory();
5171            let session = db.session();
5172
5173            session
5174                .execute("CREATE EDGE TYPE KNOWS CONNECTING (Person) TO (Person) (since INTEGER)")
5175                .unwrap();
5176
5177            let result = session.execute("SHOW EDGE TYPES").unwrap();
5178            assert_eq!(
5179                result.columns,
5180                vec!["name", "properties", "source_types", "target_types"]
5181            );
5182            assert_eq!(result.rows.len(), 1);
5183            assert_eq!(result.rows[0][0], Value::from("KNOWS"));
5184        }
5185
5186        #[test]
5187        fn test_show_graph_types() {
5188            let db = GrafeoDB::new_in_memory();
5189            let session = db.session();
5190
5191            session
5192                .execute("CREATE NODE TYPE Person (name STRING)")
5193                .unwrap();
5194            session
5195                .execute(
5196                    "CREATE GRAPH TYPE social (\
5197                        NODE TYPE Person (name STRING)\
5198                    )",
5199                )
5200                .unwrap();
5201
5202            let result = session.execute("SHOW GRAPH TYPES").unwrap();
5203            assert_eq!(
5204                result.columns,
5205                vec!["name", "open", "node_types", "edge_types"]
5206            );
5207            assert_eq!(result.rows.len(), 1);
5208            assert_eq!(result.rows[0][0], Value::from("social"));
5209        }
5210
5211        #[test]
5212        fn test_show_graph_type_named() {
5213            let db = GrafeoDB::new_in_memory();
5214            let session = db.session();
5215
5216            session
5217                .execute("CREATE NODE TYPE Person (name STRING)")
5218                .unwrap();
5219            session
5220                .execute(
5221                    "CREATE GRAPH TYPE social (\
5222                        NODE TYPE Person (name STRING)\
5223                    )",
5224                )
5225                .unwrap();
5226
5227            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5228            assert_eq!(result.rows.len(), 1);
5229            assert_eq!(result.rows[0][0], Value::from("social"));
5230        }
5231
5232        #[test]
5233        fn test_show_graph_type_not_found() {
5234            let db = GrafeoDB::new_in_memory();
5235            let session = db.session();
5236
5237            let result = session.execute("SHOW GRAPH TYPE nonexistent");
5238            assert!(result.is_err());
5239        }
5240
5241        #[test]
5242        fn test_show_indexes_via_gql() {
5243            let db = GrafeoDB::new_in_memory();
5244            let session = db.session();
5245
5246            let result = session.execute("SHOW INDEXES").unwrap();
5247            assert_eq!(result.columns, vec!["name", "type", "label", "property"]);
5248        }
5249
5250        #[test]
5251        fn test_show_constraints_via_gql() {
5252            let db = GrafeoDB::new_in_memory();
5253            let session = db.session();
5254
5255            let result = session.execute("SHOW CONSTRAINTS").unwrap();
5256            assert_eq!(result.columns, vec!["name", "type", "label", "properties"]);
5257        }
5258
5259        #[test]
5260        fn test_pattern_form_graph_type_roundtrip() {
5261            let db = GrafeoDB::new_in_memory();
5262            let session = db.session();
5263
5264            // Register the types first
5265            session
5266                .execute("CREATE NODE TYPE Person (name STRING NOT NULL)")
5267                .unwrap();
5268            session
5269                .execute("CREATE NODE TYPE City (name STRING)")
5270                .unwrap();
5271            session
5272                .execute("CREATE EDGE TYPE KNOWS (since INTEGER)")
5273                .unwrap();
5274            session.execute("CREATE EDGE TYPE LIVES_IN").unwrap();
5275
5276            // Create graph type using pattern form
5277            session
5278                .execute(
5279                    "CREATE GRAPH TYPE social (\
5280                        (:Person {name STRING NOT NULL})-[:KNOWS {since INTEGER}]->(:Person),\
5281                        (:Person)-[:LIVES_IN]->(:City)\
5282                    )",
5283                )
5284                .unwrap();
5285
5286            // Verify it was created
5287            let result = session.execute("SHOW GRAPH TYPE social").unwrap();
5288            assert_eq!(result.rows.len(), 1);
5289            assert_eq!(result.rows[0][0], Value::from("social"));
5290        }
5291    }
5292}